MoTLab -GO Inc. Engineering Blog-MoTLab -GO Inc. Engineering Blog-

Looker APIでスケジュール配信を自動リトライ

Lookerデータ基盤
September 09, 2020

はじめまして、AI技術開発部分析グループの伊田です。

MoTではタクシー配車アプリのKPIなどを筆頭にBIツール「Looker」でレポートを作成し、Slackに日々配信しています。この時、稀にSlack配信が失敗する場合があります。Lookerにはリトライ機構がないため配信が失敗した場合は障害対応として手動でレポートを再配信する運用が発生します。

本記事ではLooker APIを利用してスケジュール配信を自動リトライする方法を紹介したいと思います。


記事の構成

本記事は下記の流れでスケジュール配信の自動リトライ手順を説明します。

  • スケジュール配信の自動リトライ概要
  • 配信失敗ジョブの取得設定(Look)
  • Looker APIによる「スケジュール配信の自動リトライ」の実装

スケジュール配信の自動リトライ概要

自動リトライ手順

スケジュール配信が失敗したジョブを自動リトライする方法について、Lookerコミュニティで対応方法が回答されています。

https://discourse.looker.com/t/schedule/15591

それによると

  1. 配信が失敗したジョブを抽出するLookの作成
  2. Lookデータ取得APIで1から失敗したスケジュールのScheduled Plan IDを取得
  3. 2で取得したScheduled Plan IDに対してAPIのrun_onceを実行する

となります。

※ Look = 1帳票の単位と思って頂ければ差し支えありません

※ Scheduled Plan ID = 配信設定した際にLookerが裏側で持つスケジュール配信IDです

この対応方法に則って自動リトライを構築していきます。

尚、1, 2 はLookerのシステム情報を参照する必要があるためLookerの「see_system_activity」権限が必要です(Developer権限に内包されている)。つまりAPI実行ユーザーにも「see_system_activity」権限が必要です。

基盤構成

1はLookerの機能そのものですが、2, 3はPythonで実装し、ワークフローエンジンであるAirflowを用いて定期実行するようにしたいと思います。

分析基盤全体の流れとしては下記です。

  1. (0時過ぎ) Airflowで夜間に日次バッチ処理を実行して分析基盤を更新する
  2. (9時までに順次配信) Lookerで作成したレポートをSlackにレポート配信する
  3. (10時起動) Airflowで配信が失敗したジョブをLooker APIを利用して抽出し、失敗したジョブが存在すればスケジュール配信APIを実行する

※本記事ではAirflowの説明はしません

An image from Notion

配信失敗ジョブの取得設定(Look)

配信失敗ジョブを取得するためのLookを作成します。

Explore > System Activity > Scheduled Plan を選択します。

An image from Notion

Explore画面で配信失敗ジョブを取得するための設定を行います。

An image from Notion

まずは、Scheduled job や Scheduled Plan からどんなデータが取得できるか確認します。

DIMENSIONSから

  1. Scheduled job.Created Time(配信ジョブ実行日時)
  2. Scheduled job.Finalized Time(配信ジョブ完了日時)
  3. Scheduled Plan.ID(スケジュール配信ID)
  4. Scheduled job.ID(スケジュールジョブID)
  5. Scheduled job.Status(配信ジョブ実行結果)
  6. Scheduled Plan.Run Once(Yes: 1回だけ送信 / テスト配信 No: スケジュール配信)
  7. Scheduled job.Name(配信ジョブ名) ※業務関連の名称が設定されているため黒塗りにしています
  8. Scheduled Plan.Dashboard ID(Dashboardに配信設定している場合)
  9. Scheduled Plan.Look ID(Lookに配信設定をしている場合)

を選択します。

An image from Notion

どういったデータが取得できるか肌感覚がつかめたので実際にAPI用のLookに落としこんでいきます。

DIMENSIONSから

  1. Scheduled job.Created Time(配信ジョブ実行日時)
  2. Scheduled Plan.ID(スケジュール配信ID)
  3. Scheduled job.Name(配信ジョブ名)

を選択します。

次にFILTERSで

  1. Scheduled job.Created Time 「が過去」 「10」 「時間」 (10時起動なので分析基盤更新タイミングである0時から10時までの配信失敗ジョブのみに絞る)
  2. Scheduled Plan.Run Once 「が次である」 「No」 (スケジュール配信のみ)
  3. Scheduled job.Status 「が次の値に等しい(=) 」 「failure」 (配信ジョブ実行結果が失敗のみ)

を設定します。

An image from Notion

Lookを保存後、APIを実行するときにLookIDが必要なので控えておきます。

弊社環境では look_id = 442 です。

※LookIDはURL部分で確認できます

An image from Notion

Looker APIの実装

Looker APIの処理を実装するにあたり、いきなり実装に入るのではなくLooker APIの紹介、APIキーの取得方法を確認した上で実装していきます。流れとしては下記の通りです。

  • Looker APIの紹介
  • Looker APIキーの取得
  • ディレクトリ構成
  • Looker API共通処理の実装
  • Looker APIによる「スケジュール配信の自動リトライ」の実装

Looker APIの紹介

APIを実装する前にLooker APIを簡単に紹介させて頂きます。不要な方は飛ばしてください。

公式ドキュメント

https://docs.looker.com/reference/api-and-integration/api-reference/v3.1

Lookからデータを取得する[Run Look]

https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#run_look

スケジュール配信再送[Run Scheduled Plan Once by Id]

https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/scheduled-plan#run_scheduled_plan_once_by_id

ドキュメントだけでなく実際にAPIを試す環境もあります。コード実装せずにAPIを試せるのでとても助かります。

https://your-base-url.looker.com:19999/api-docs/index.html

An image from Notion

Run Look を試す場合は SDK run_look(look_id, result_format) から look_id = 442, json をセットして Try it out! ボタンを押下すればすぐに結果が返ってきます。Python実装としては、 run_lookで得られた scheduled_plan_id をパラメータにして run_scheduled_plan_once_by_id を実行することになります。

An image from Notion

Looker APIキーの取得

APIを実装する前にLooker APIキーを取得します。

管理 > Users(Users) > Edit を選択します。

An image from Notion

API3 Keys Edit Keys を選択します。

An image from Notion

New API3 Key を選択してAPI Keyを発行します。

今後は、 Client IDClient Secret を利用してAPIを実装します。

An image from Notion

ディレクトリ構成

ディレクトリ構成は下記になります。

共通系処理を実装した後に本丸のスケジュール配信リトライ処理を実装していきます。

base_directory
 |-- looker
 |- looker_api.py : 共通系処理
 └-- looker_job_rerunner.py : スケジュール配信リトライ処理

最終的にAirflowのDAGから looker_job_rerunner.py をPythonOperatorで読み込みます。

Looker API共通処理の実装

looker_api.py ではログイン処理とリクエスト処理(get, post)を実装します。

※今回は取得(get)、新規作成(post)だけですが、必要に応じて更新(patch)、削除(delete)のリクエスト処理を実装してください

エンドポイント https://your-base-url.looker.com:19999/api/3.1/loginclient_idclient_secret を利用してログインした後に access_token を保持します。

client_idclient_secret はAirflowの管理画面から設定してPG上で取得するようにしています。

import requests
from airflow import models

headers = {'content-type': 'application/json'}

class LookerAPI():
    """
        https://docs.looker.com/reference/api-and-integration/api-reference/v3.1
        LookerAPIクラス
        ログイン処理および各種LookerAPIのエンドポイントへの接続
    """
    def __init__(self, base_url='https://your-base-url.looker.com:19999/api/3.1'):
        self.base_url = base_url
        self.base_params = {
            'verify_ssl': True,
            'timeout': 120
        }

        url = self.base_url + '/login'
        params = self.base_params.copy()
        params['client_id'] = models.Variable.get('looker_client_id')
        params['client_secret'] = models.Variable.get('looker_client_secret')

        response_json = self.post(url=url, params=params)

        if 'access_token' in response_json:
            self.base_params['access_token'] = response_json['access_token']
        else:
            # 後ろ4文字のみ残してマスク
            mask_client_secret = ('X' * (len(params['client_secret']) - 4)) + params['client_secret'][-4:]
            raise Exception(f"Looker API Login Failed client_id:{params['client_id']} / client_secret:{mask_client_secret}")

    def get(self, url, params) -> dict:
            response = requests.get(url=url, params=params, headers=headers)
    
            if response.status_code != 200:
                raise Exception(f'status code: {response.status_code} / {response.json()}')
    
            return response.json()

    def post(self, url, params, json={}) -> dict:
            response = requests.post(url=url, params=params, headers=headers, json=json)
    
            if response.status_code != 200:
                raise Exception(f'status code: {response.status_code} / {response.json()}')
    
            return response.json()

Looker APIによる「スケジュール配信の自動リトライ」の実装

looker_job_rerunner.py ではスケジュール配信の自動リトライ処理を実装します。

各エンドポイントは次の通りです。

Look取得

https://your-base-url.looker.com:19999/api/3.1/looks/:look_id/run/json

スケジュール配信

https://your-base-url.looker.com:19999/api/3.1/scheduled_plans/:scheduled_plan_id/run_once

尚、Look取得時に念の為キャッシュを破棄するパラメータを設定しています。

import logging
from base_directory.looker.looker_api import LookerAPI

logging.getLogger().setLevel(logging.INFO)

class LookerJobRerunner():
    """
        Lookerの定時スケジュール配信で失敗となったジョブを取得し、再実行するためのクラス
        制限として、ジョブの再実行まではするが再実行となったジョブが配信成功するかどうかはわからない
    """

    # look_id 442 はスケジュール配信が失敗したジョブを取得するための帳票ID
    def __init__(self, look_id=442):
        self.api = LookerAPI()
        self.look_id_for_fetch = look_id

    def _fetch_scheduled_plan(self) -> [dict]:
        url = f'{self.api.base_url}/looks/{self.look_id_for_fetch}/run/json'
        params = self.api.base_params.copy()
        params['cache'] = False

        return self.api.get(url=url, params=params)

    def _run_once_scheduled_plan(self, scheduled_plan_id):
        url = f'{self.api.base_url}/scheduled_plans/{scheduled_plan_id}/run_once'
        params = self.api.base_params.copy()

        self.api.post(url=url, params=params)

    def fetch_with_failure_and_retry(self):
        logging.info('fetch_scheduled_plan')
        scheduled_plan_dict_list = self._fetch_scheduled_plan()

        logging.info('run_once_scheduled_plan')
        for scheduled_plan_dict in scheduled_plan_dict_list:
            logging.info(scheduled_plan_dict)
            scheduled_plan_id = scheduled_plan_dict['scheduled_plan.id']
            # 必要に応じて scheduled_job.name も取得

            try:
                _ = self._run_once_scheduled_plan(scheduled_plan_id=scheduled_plan_id)
            except Exception as e:
                """
                    ここのExceptionに来るときは基本的に対処しようがないとき
                    スケジュール配信が失敗したあとで配信ジョブを一度削除して再作成したため対象の配信IDが見つからない
                    など
                """
                logging.error(f'scheduled_plan_id: {scheduled_plan_id}')
                logging.error(f'Exception: {e}')

        return 200

Airflowの実装については割愛しますが、以上の実装を日次10時に起動することで「スケジュール配信の自動リトライ」を実現しています。

実際に自動リトライされたシーン

9時30分ごろにいつもなら出ているレポートが出ていないと連絡を頂きました。

10時に「スケジュール配信の自動リトライ」ジョブが起動することで運用チームが障害対応をせずに済んでいます。

※メンションに気づいた時は既に再配信されていたという状況です

An image from Notion

まとめ

本記事ではLooker APIを利用してスケジュール配信を自動リトライする方法を紹介しました。欲しい情報をLookであらかじめ定義しておき、Looker APIを利用してデータを取得、スケジュール配信を自動リトライする処理を実装しました。この記事をもとにLookerの運用負荷が少しでも軽減されれば幸いです。

また、Looker APIを応用すれば他にも色々なことができるので本記事が参考になればと思います。

Mobility Technologiesでは、Mobility領域で働きたい人、Mobilityの未来を一緒に考えてくれる人を募集しています!

https://hrmos.co/pages/mo-t/jobs