タクシーアプリ「GO」、法人向けサービス「GO BUSINESS」、タクシーデリバリーアプリ「GO Dine」の分析基盤を開発運用している伊田です。今回、dbt と Dataform を比較して Dataform を利用することにしましたので、導入経緯および Dataform の初期構築を紹介します。
※ 本記事の対象読者はELTツールを利用している方を対象にしています
これは MoT Engineer Challenge Week 2022 Spring の記事です。
本記事では、まず、dbt および Dataform というツールについて簡単に説明させて頂き、次に現在データ分析チームが抱えている課題について取り上げます。その後、2つのツールについて検証した内容を紹介し、その結果、Dataform の導入に至った経緯を説明します。また、最後に Dataform の初期構築で工夫した点についても紹介させて頂きます。
ツール導入に至るまでに様々な記事を参考にさせて頂きました。最初に謝辞を述べさせて頂きますとともに、参考にしたサイトは本記事の最後に一覧として記載させて頂いています。
※ 検証および初期構築は千田と伊田で実施しました
※ 検証は Engineer Challenge Week を利用して実施しました
dbt, Dataform という2つの製品は、ELTのうち、Transform をするためのツールです。つまり、分析基盤にデータが格納された後に、SQLを発行してデータの加工処理をするためのツールで、加えて null チェックや unique チェックなどのテスト、ドキュメンテーション、データリネージ、データパイプラインの実行・スケジューリング等の管理もすることができます。
現在、分析チームには、データマートのリリース速度や品質に課題があります。
結果として、下記の事象が発生しています。
こうした課題への対応として諸々機能がそろっている dbt や Dataform の検討をしました
Google検索による結果が下記です
※ 2022/3/31 確認
主観的なものとなりますが、基本的には SQL + dbt / Dataform のお作法に則る形であるので、データアナリストが触る部分としては、dbt も Dataform もそこまで学習コストは高くないと感じました。
一部コア部分の作り込みやCLI版については多少学習コストが必要だと思います。
機能比較には、こちらのチュートリアルを参考に行いました。
※ 主要なものを取り上げており、すべての機能を網羅しているわけではありません
データモデル定義
前処理、後処理
データロード
ソース定義
クエリの部品化
Viewの作成
Tableの作成
Tableの作成 incremental model
unique_key の指定がない場合は Insert 処理
merge into dest
using (
select
.
.
.
from source
where
created_at > (select max(created_at) from dest)
) as source
on False
when not matched then insert
.
.
.
unique_key の指定がある場合は Upsert 処理
merge into dest
using (
select
.
.
.
from source
where
created_at > (select max(created_at) from dest)
) as source
on dest.id = source.id
when matched then update set
.
.
.
when not matched then insert
.
.
.
incremental_strategy で insert_overwrite を指定した場合は DELETE INSERT によるパーティション置換処理
-- 定義ファイル
-- 当日と前日分を取得する。柔軟にやる場合は macro を使う
{% set partitions_to_replace = [
'date(current_date)',
'date(date_sub(current_date, interval 1 day))'
] %}
{{
config(
materialized='incremental',
incremental_strategy = 'insert_overwrite',
unique_key='order_id',
partition_by={
'field': 'order_date',
'data_type': 'date'
},
partitions = partitions_to_replace
)
}}
select
id as order_id,
user_id as customer_id,
order_date,
status
from research_dbt.raw_orders
{% if is_incremental() %}
where order_date in ({{ partitions_to_replace | join(',') }})
{% endif %}
merge into `myproject`.`research_dbt`.`stg_orders` as DBT_INTERNAL_DEST
using (
select
id as order_id,
user_id as customer_id,
order_date,
status
from research_dbt.raw_orders
where order_date in (date(current_date),date(date_sub(current_date, interval 1 day)))
) as DBT_INTERNAL_SOURCE
on FALSE
when not matched by source
and DBT_INTERNAL_DEST.order_date in (
date(current_date), date(date_sub(current_date, interval 1 day))
)
then delete
when not matched then insert
(`order_id`, `customer_id`, `order_date`, `status`)
values
(`order_id`, `customer_id`, `order_date`, `status`)
uniqueKey の指定がない場合は Insert 処理
insert into dest
select ... from source
where created_at > (select max(created_at) from dest)
unique_key の指定がある場合は Upsert 処理
merge dest T
using (
select
.
.
.
from source
where created_at > (select max(created_at) from dest)
) S
on T.id = S.id
when matched then update set
.
.
.
when not matched then
.
.
.
updatePartitionFilter の指定がある場合は パーティションのプルーニングが行われる
-- 定義ファイル
-- 前日分以降を更新対象にする。柔軟にやる場合は pre_operations を使う
config {
type: "incremental",
uniqueKey: ["order_id"],
bigquery: {
partitionBy: "order_date",
updatePartitionFilter: "order_date > DATE(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY))"
}
}
select
id as order_id,
user_id as customer_id,
order_date,
status
from ${ref("raw_orders")}
where
order_date > DATE(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY))
merge `myproject.research_dataform.stg_orders` T
using (
select
id as order_id,
user_id as customer_id,
order_date,
status
from `myproject.research_dbt.raw_orders`
where
order_date > DATE(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY))
) S
on T.order_id = S.order_id
and T.order_date > DATE(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY))
when matched then
update set `order_id` = S.order_id,`customer_id` = S.customer_id,`order_date` = S.order_date,`status` = S.status
when not matched then
insert (`order_id`,`customer_id`,`order_date`,`status`) values (`order_id`,`customer_id`,`order_date`,`status`)
テスト
ドキュメント、データリネージ
スナップショット
ジョブ実行
スケジューラー
リカバリ / backfill
Slack通知
結論としては、Dataform を選択することにしました。不確定要素が多い中では、Dataform のほうがスモールスタートしやすいと判断しました。
結果が出て機能が物足りない場合は、dbt への移行も検討したいと思います。基本的な思想は同じなので移行は難しくなく、実績があれば予算も取りやすいと考えています。
今回は Dataform を選択しましたが、dbt と Dataform、この2つは素晴らしい製品だと思います。特に気に入っているのは ref 関数です。この関数があることでデータリネージとして可視化ができ、調査時に依存関係を簡単に把握することができます。また、ジョブ実行時も依存関係を考慮して自動的に順次実行してくれるのが嬉しいと感じています。
ここからは Dataform 導入にあたり初期構築をどのようにしたか紹介したいと思います。
※ ここからはチュートリアル程度の知識がある前提で記述しています
下記の理由からSaaS版とCLI版を併用することにしました。
運用の流れとしては下記を想定しています。
コードは GitHub と連携しています。
本番環境と開発環境は、GCPプロジェクトでわけています(データセット配下は同じ構成)。
デフォルトは開発環境に向くようにして、master にマージされて初めて本番環境に処理が向くようにしています。
{
"environments": [
{
"name": "development",
"configOverride": {},
"gitRef": "develop"
},
{
"name": "production",
"configOverride": {
"defaultDatabase": "prod-project"
},
"gitRef": "master"
}
]
}
definitions 配下(SQL置き場)は ベストプラクティスに則ってディレクトリを切りました。
また、SaaS版で生成された初期ファイルに加えて、CLI版の利用や各種スクリプトを tools 配下に切っています。
.
├── definitions
│ ├── playground
│ ├── reporting
│ ├── sources
│ └── staging
├── includes
│ └── date_config.js
├── dataform.json
├── dataform_prod.json
├── environments.json
├── package-lock.json
├── package.json
└── tools
├── cli
└── scripts
テーブル名.sqlx としています。
例えば、データマートにテーブルを作る場合は下記となります。
dataset_id.table_name の場合
config {
type: "incremental",
tags: ["dataform_test_dag_v1"],
schema: "dataset_id",
uniqueKey: ["id"],
bigquery: {
partitionBy: "DATE(ts)",
updatePartitionFilter: "ts >= raw_start_ts"
}
}
SELECT
.
.
.
FROM ${ref("ref_dataset_id", "ref_table_name")}
SaaS版、CLI版ともに動的な日付を指定できるような JavaScript を作成しました。
まず、dataform.json に下記の通り変数を定義しています。
dataform.json
{
"warehouse": "bigquery",
"defaultSchema": "dataform",
"assertionSchema": "dataform_assertions",
"defaultDatabase": "dev-project",
"vars": {
"shouldOverrideVars": "false",
"targetStartTs": "2022-04-01 09:00:00+9",
"targetEndTs": "2022-04-01 10:00:00+9"
}
}
includes/date_config.js
最終的に生成する日付は4つです。
日付を4つ定義しているのは、処理対象のテーブルにはストリーミングインサートで取り込み時間パーティション分割テーブルに挿入されたデータがあり、そのようなテーブルに対しては、_PARTITIONTIME に raw_start_ts と raw_end_ts を使って一時フィルタリングを行い、最終的に created_at のような実際に処理対象としたいタイムスタンプに start_ts と end_ts を使って絞り込むためです。
SaaS版で実行する時はshouldOverrideVars は必ず false です。
CLI版で実行するときは、shouldOverrideVarsはtrue を指定して、targetStartTs と targetEndTs に任意の期間を指定します。
date_config.js
function getStartTs(unit, start_ago) {
if (`${dataform.projectConfig.vars.shouldOverrideVars}` == "true") {
return `TIMESTAMP('${dataform.projectConfig.vars.targetStartTs}')`;
} else {
return `TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ${start_ago} ${unit}), HOUR)`;
}
}
function getEndTs(unit, end_ago) {
if (`${dataform.projectConfig.vars.shouldOverrideVars}` == "true") {
return `TIMESTAMP('${dataform.projectConfig.vars.targetEndTs}')`;
} else {
return `TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ${end_ago} ${unit}), HOUR)`;
}
}
function getRawStartTs(unit, start_ago, start_margin) {
return `TIMESTAMP_SUB(${getStartTs(unit, start_ago)}, INTERVAL ${start_margin} ${unit})`
}
function getRawEndTs(unit, end_ago, end_margin) {
return `TIMESTAMP_ADD(${getEndTs(unit, end_ago)}, INTERVAL ${end_margin} ${unit})`
}
/*
BigQuery Scripting
*/
function createTemporaryFunctionGetHourUnitTs(start_ago=1, end_ago=0, start_margin=0, end_margin=0) {
return `"""
create temporary function getHourUnitTs(ts STRING) AS (
CASE ts
WHEN 'raw_start_ts' THEN ${getRawStartTs('HOUR', start_ago, start_margin)}
WHEN 'raw_end_ts' THEN ${getRawEndTs('HOUR', end_ago, end_margin)}
WHEN 'start_ts' THEN ${getStartTs('HOUR', start_ago)}
WHEN 'end_ts' THEN ${getEndTs('HOUR', end_ago)}
END
);
"""`
}
function createTemporaryFunctionGetDayUnitTs(start_ago=1, end_ago=0, start_margin=0, end_margin=0) {
return `"""
create temporary function getDayUnitTs(ts STRING) AS (
CASE ts
WHEN 'raw_start_ts' THEN ${getRawStartTs('DAY', start_ago, start_margin)}
WHEN 'raw_end_ts' THEN ${getRawEndTs('DAY', end_ago, end_margin)}
WHEN 'start_ts' THEN ${getStartTs('DAY', start_ago)}
WHEN 'end_ts' THEN ${getEndTs('DAY', end_ago)}
END
);
"""`
}
module.exports = {
createTemporaryFunctionGetHourUnitTs,
createTemporaryFunctionGetDayUnitTs
};
使い方としては下記です。
createTemporaryFunctionGetHourUnitTs
引数(=デフォルト値)
pre_operations 内で、EXECUTE IMMEDIATE FORMAT を実行することで、create temporary function を実行し、日付を取得できるようにしています。
-- createTemporaryFunctionGetHourUnitTs
pre_operations {
EXECUTE IMMEDIATE FORMAT(${date_config.createTemporaryFunctionGetHourUnitTs(
/* start_ago = */ 1,
/* end_ago = */ 0,
/* start_margin = */ 24,
/* end_margin = */ 24)});
}
SELECT
.
.
.
FROM $ref("dataset_id", "table_name")
WHERE
_PARTIONTIME >= getHourUnitTs('raw_start_ts')
AND _PARTIONTIME < getHourUnitTs('raw_end_ts')
AND created_at >= getHourUnitTs('start_ts')
AND created_at < getHourUnitTs('end_ts')
2022/5/2 10:10 (JST) に実行した場合、
となります。
ここからは Dataform 導入にあたり整備したツール類を紹介します。
tools/cli 配下は下記のようになっています。
tools/cli
├── Dockerfile
├── README.md
├── compiled
├── compiled_json_analyzer.js
├── deploy.sh
├── df-credentials.json
├── df-credentials_prod.json
├── docker-compose.yaml
├── docker-compose_prod.yaml
└── settings.json
2種類あるファイルは、本番環境と開発環境用で無印が開発環境用です。docker image は本番用と開発用で切り分けています。
Dockerfile
_env=”_prod” が渡されると本番用です。
FROM node:17-buster-slim
ARG _env=""
# 基本的に依存するものはないのでコンテナ内で使う可能性があるものを追記する
RUN apt-get update \
&& apt-get dist-upgrade -y \
&& apt-get install -y --no-install-recommends \
vim \
jq \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
/var/tmp/*
WORKDIR /usr/app/dataform
RUN npm i -g @dataform/cli@1.21.1
# dataform cli 使用時の設定
COPY tools/cli/settings.json /root/.dataform/
# OAuth 認証のため接続先のプロジェクトのみが記載されている
COPY tools/cli/df-credentials${_env}.json /usr/app/dataform/.df-credentials.json
# 資材
COPY definitions /usr/app/dataform/definitions
COPY includes /usr/app/dataform/includes
COPY dataform${_env}.json /usr/app/dataform/dataform.json
COPY package.json /usr/app/dataform/
RUN dataform install .
ENTRYPOINT tail -f /dev/null
setting.json
dataform init で生成されるファイルです。
{
"allowAnonymousAnalytics": true,
"anonymousUserId": "your-anonymous-user-id"
}
df-credentials.json
同じく、dataform init で生成されるファイルです。
{
"projectId": "dev-project",
"location": "US"
}
docker-compose.yaml
version: "3"
services:
dataform:
# image: your-image-path
build:
context: ../../
dockerfile: tools/cli/Dockerfile
args:
_env: ""
container_name: dev
volumes:
- ~/.config/gcloud:/root/.config/gcloud
- ../../definitions:/usr/app/dataform/definitions
- ../../includes:/usr/app/dataform/includes
# - ./compiled:/usr/app/dataform/compiled
# - ./compiled_json_analyzer.js:/usr/app/dataform/compiled_json_analyzer.js
この docker image を Airflow の GKEPodOperator で呼び出して Dataform を実行しています。
実行コマンドは下記です。
dataform run \
--actions destination \
--vars=shouldOverrideVars=true,targetStartTs='YYYY-MM-DD hh:mi:ss+9',targetEndTs='YYYY-MM-DD hh:mi:ss+9'
dbt のこちらの記事を参考に、Airflow の1タスク = Dataform の1テーブル生成処理としたかったのでツールを作りました。ただし、Airflow 上で DAG の解析に負荷を掛けることをしたくないため、Airflow 上で動的に作るのではなく、タスクの依存関係を考慮した Airflow 用のコードを出力するツールを用意しました。
dbt の manifest.json に相当するデータは下記のコマンドから出力できます。
dataform compile --json > manifest.json
Dataform でコンパイルされたクエリをファイルとして生成したくて compiled_json_analyzer.js というツールを用意しました(dbt はコンパイル時にクエリが出力されます)。
docker コンテナ内で下記のコマンドを打つと、json ファイルを解析して SQL ファイルに変換してくれます。
dataform compile --json | node compiled_json_analyzer.js
既存のテーブルを Dataform の declaration として取り込みたいので、BigQuery のデータセットを指定すると、データセット配下のテーブルを declaration ファイルとして出力するツールを用意しました。
本記事では、dbt と Dataform を比較検討し、Dataform の導入に至った背景を説明しました。また、Dataform の初期構築のアイデアも紹介させて頂きました。
今後は Dataform を分析チーム内に浸透させ、当初の課題だったデータアナリストが気軽にデータパイプラインを作れない状況を減らし、野良スケジューリングクエリを Dataform に移行させることや、ビジネスロジックを Looker に作り込まないように是正をしていきたいと考えています。加えて、分析基盤のデータの品質向上に注力できる状態を作っていきたいと考えています。
この比較記事が皆様のご参考になれば幸いです。
興味のある方は 採用ページ も見ていただけると嬉しいです。
データエンジニアは こちら から
Twitter @mot_techtalk のフォローもよろしくお願いします!