はじめまして、AI技術開発部分析グループの伊田です。
MoTではタクシー配車アプリのKPIなどを筆頭にBIツール「Looker」でレポートを作成し、Slackに日々配信しています。この時、稀にSlack配信が失敗する場合があります。Lookerにはリトライ機構がないため配信が失敗した場合は障害対応として手動でレポートを再配信する運用が発生します。
本記事ではLooker APIを利用してスケジュール配信を自動リトライする方法を紹介したいと思います。
本記事は下記の流れでスケジュール配信の自動リトライ手順を説明します。
スケジュール配信が失敗したジョブを自動リトライする方法について、Lookerコミュニティで対応方法が回答されています。
https://discourse.looker.com/t/schedule/15591
それによると
となります。
※ Look = 1帳票の単位と思って頂ければ差し支えありません
※ Scheduled Plan ID = 配信設定した際にLookerが裏側で持つスケジュール配信IDです
この対応方法に則って自動リトライを構築していきます。
尚、1, 2 はLookerのシステム情報を参照する必要があるためLookerの「see_system_activity」権限が必要です(Developer権限に内包されている)。つまりAPI実行ユーザーにも「see_system_activity」権限が必要です。
1はLookerの機能そのものですが、2, 3はPythonで実装し、ワークフローエンジンであるAirflowを用いて定期実行するようにしたいと思います。
分析基盤全体の流れとしては下記です。
※本記事ではAirflowの説明はしません
配信失敗ジョブを取得するためのLookを作成します。
Explore > System Activity > Scheduled Plan を選択します。
Explore画面で配信失敗ジョブを取得するための設定を行います。
まずは、Scheduled job や Scheduled Plan からどんなデータが取得できるか確認します。
DIMENSIONSから
を選択します。
どういったデータが取得できるか肌感覚がつかめたので実際にAPI用のLookに落としこんでいきます。
DIMENSIONSから
を選択します。
次にFILTERSで
を設定します。
Lookを保存後、APIを実行するときにLookIDが必要なので控えておきます。
弊社環境では look_id = 442 です。
※LookIDはURL部分で確認できます
Looker APIの処理を実装するにあたり、いきなり実装に入るのではなくLooker APIの紹介、APIキーの取得方法を確認した上で実装していきます。流れとしては下記の通りです。
APIを実装する前にLooker APIを簡単に紹介させて頂きます。不要な方は飛ばしてください。
公式ドキュメント
https://docs.looker.com/reference/api-and-integration/api-reference/v3.1
Lookからデータを取得する[Run Look]
https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#run_look
スケジュール配信再送[Run Scheduled Plan Once by Id]
ドキュメントだけでなく実際にAPIを試す環境もあります。コード実装せずにAPIを試せるのでとても助かります。
https://your-base-url.looker.com:19999/api-docs/index.html
Run Look を試す場合は SDK run_look(look_id, result_format) から look_id = 442, json をセットして Try it out! ボタンを押下すればすぐに結果が返ってきます。Python実装としては、 run_lookで得られた scheduled_plan_id をパラメータにして run_scheduled_plan_once_by_id を実行することになります。
APIを実装する前にLooker APIキーを取得します。
管理 > Users(Users) > Edit を選択します。
API3 Keys Edit Keys を選択します。
New API3 Key を選択してAPI Keyを発行します。
今後は、 Client ID と Client Secret を利用してAPIを実装します。
ディレクトリ構成は下記になります。
共通系処理を実装した後に本丸のスケジュール配信リトライ処理を実装していきます。
base_directory
|-- looker
| └- looker_api.py : 共通系処理
└-- looker_job_rerunner.py : スケジュール配信リトライ処理
最終的にAirflowのDAGから looker_job_rerunner.py をPythonOperatorで読み込みます。
looker_api.py ではログイン処理とリクエスト処理(get, post)を実装します。
※今回は取得(get)、新規作成(post)だけですが、必要に応じて更新(patch)、削除(delete)のリクエスト処理を実装してください
エンドポイント https://your-base-url.looker.com:19999/api/3.1/login に client_id と client_secret を利用してログインした後に access_token を保持します。
client_id と client_secret はAirflowの管理画面から設定してPG上で取得するようにしています。
import requests
from airflow import models
headers = {'content-type': 'application/json'}
class LookerAPI():
"""
https://docs.looker.com/reference/api-and-integration/api-reference/v3.1
LookerAPIクラス
ログイン処理および各種LookerAPIのエンドポイントへの接続
"""
def __init__(self, base_url='https://your-base-url.looker.com:19999/api/3.1'):
self.base_url = base_url
self.base_params = {
'verify_ssl': True,
'timeout': 120
}
url = self.base_url + '/login'
params = self.base_params.copy()
params['client_id'] = models.Variable.get('looker_client_id')
params['client_secret'] = models.Variable.get('looker_client_secret')
response_json = self.post(url=url, params=params)
if 'access_token' in response_json:
self.base_params['access_token'] = response_json['access_token']
else:
# 後ろ4文字のみ残してマスク
mask_client_secret = ('X' * (len(params['client_secret']) - 4)) + params['client_secret'][-4:]
raise Exception(f"Looker API Login Failed client_id:{params['client_id']} / client_secret:{mask_client_secret}")
def get(self, url, params) -> dict:
response = requests.get(url=url, params=params, headers=headers)
if response.status_code != 200:
raise Exception(f'status code: {response.status_code} / {response.json()}')
return response.json()
def post(self, url, params, json={}) -> dict:
response = requests.post(url=url, params=params, headers=headers, json=json)
if response.status_code != 200:
raise Exception(f'status code: {response.status_code} / {response.json()}')
return response.json()
looker_job_rerunner.py ではスケジュール配信の自動リトライ処理を実装します。
各エンドポイントは次の通りです。
Look取得
https://your-base-url.looker.com:19999/api/3.1/looks/:look_id/run/json
スケジュール配信
https://your-base-url.looker.com:19999/api/3.1/scheduled_plans/:scheduled_plan_id/run_once
尚、Look取得時に念の為キャッシュを破棄するパラメータを設定しています。
import logging
from base_directory.looker.looker_api import LookerAPI
logging.getLogger().setLevel(logging.INFO)
class LookerJobRerunner():
"""
Lookerの定時スケジュール配信で失敗となったジョブを取得し、再実行するためのクラス
制限として、ジョブの再実行まではするが再実行となったジョブが配信成功するかどうかはわからない
"""
# look_id 442 はスケジュール配信が失敗したジョブを取得するための帳票ID
def __init__(self, look_id=442):
self.api = LookerAPI()
self.look_id_for_fetch = look_id
def _fetch_scheduled_plan(self) -> [dict]:
url = f'{self.api.base_url}/looks/{self.look_id_for_fetch}/run/json'
params = self.api.base_params.copy()
params['cache'] = False
return self.api.get(url=url, params=params)
def _run_once_scheduled_plan(self, scheduled_plan_id):
url = f'{self.api.base_url}/scheduled_plans/{scheduled_plan_id}/run_once'
params = self.api.base_params.copy()
self.api.post(url=url, params=params)
def fetch_with_failure_and_retry(self):
logging.info('fetch_scheduled_plan')
scheduled_plan_dict_list = self._fetch_scheduled_plan()
logging.info('run_once_scheduled_plan')
for scheduled_plan_dict in scheduled_plan_dict_list:
logging.info(scheduled_plan_dict)
scheduled_plan_id = scheduled_plan_dict['scheduled_plan.id']
# 必要に応じて scheduled_job.name も取得
try:
_ = self._run_once_scheduled_plan(scheduled_plan_id=scheduled_plan_id)
except Exception as e:
"""
ここのExceptionに来るときは基本的に対処しようがないとき
スケジュール配信が失敗したあとで配信ジョブを一度削除して再作成したため対象の配信IDが見つからない
など
"""
logging.error(f'scheduled_plan_id: {scheduled_plan_id}')
logging.error(f'Exception: {e}')
return 200
Airflowの実装については割愛しますが、以上の実装を日次10時に起動することで「スケジュール配信の自動リトライ」を実現しています。
9時30分ごろにいつもなら出ているレポートが出ていないと連絡を頂きました。
10時に「スケジュール配信の自動リトライ」ジョブが起動することで運用チームが障害対応をせずに済んでいます。
※メンションに気づいた時は既に再配信されていたという状況です
本記事ではLooker APIを利用してスケジュール配信を自動リトライする方法を紹介しました。欲しい情報をLookであらかじめ定義しておき、Looker APIを利用してデータを取得、スケジュール配信を自動リトライする処理を実装しました。この記事をもとにLookerの運用負荷が少しでも軽減されれば幸いです。
また、Looker APIを応用すれば他にも色々なことができるので本記事が参考になればと思います。
Mobility Technologiesでは、Mobility領域で働きたい人、Mobilityの未来を一緒に考えてくれる人を募集しています!
https://hrmos.co/pages/mo-t/jobs