こんにちは、SREグループのカンタンです!GO株式会社では複雑なワークフローを実現するためにAirflowを利用しています。Kubernetesを活かして、マルチテナンシーによるセキュリティ課題を解決しシームレスな開発体験を提供するAirflow基盤を用意しました。今回はその基盤をご紹介します。
Airflowとはワークフローを管理するためのプラットフォームです。実行したいタスクとその依存関係を一つのDAG (DAG = Directed Acyclic Graph = 有向非巡回グラフ) としてPythonファイルで定義すると、その依存関係に従ってタスクが実行されます。更に、タスクの状態、履歴やログなどをGUIで確認できます。
Kubernetes、AWS S3、BigQuery、SQL DBなど様々なインテグレーションが存在していて機能も豊富なため非常に人気があります。
bashを使って hello を出力するタスクとPythonで airflow を出力するタスクのDAGサンプル:
from datetimeimport datetime
from airflowimport DAG
from airflow.decoratorsimport task
from airflow.operators.bashimport BashOperator
# DAG = ワークフロー
with DAG(
dag_id="demo",
start_date=datetime(2022, 1, 1),
schedule="0 0 * * *",
) as dag:
# Operator = タスク
# bashタスク
hello = BashOperator(task_id="hello", bash_command="echo hello")
# Pythonタスク
@task()
def airflow():
print("airflow")
# タスクの依存関係を定義
hello >> airflow()
SREグループはAWSを利用することが多いため、Amazon Managed Workflows for Apache Airflow (Amazon MWAA)というマネージドAirflowサービスを利用しています。
VPCとS3バケットを用意すればAmazon MWAA環境が作れます。以下のようなアーキテクチャになります:
AirflowのSchedulersがワークフローとタスクのスケジューリングを行い、Workersがタスクを実際に実行しています。タスクの実行方法がタスクの種類によります:BashやPythonタスクはWorkersに直接実行されますが、例えばAirflowのKubernetesPodOperatorタスクを利用するとWorkersがKubernetesクラスタ上のPodを作成するため主な処理をPodにオフロードできます。
Amazon MWAA環境を作成するためのサンプルTerraform設定:
locals {
s3_bucket_arn = "..."
security_group_ids = [...]
subnet_ids = [...]
eks_cluster_arns = [...]
}
# --------------------
# Amazon MWAA (Amazon Managed Workflows for Apache Airflow)
# --------------------
resource "aws_mwaa_environment" "this" {
name = "demo-airflow"
# Network
network_configuration {
security_group_ids = local.security_group_ids
subnet_ids = local.subnet_ids
}
webserver_access_mode = "PUBLIC_ONLY"
# Environment
environment_class = "mw1.small"
execution_role_arn = aws_iam_role.execution_role.arn
min_workers = 1
max_workers = 10
schedulers = 2
weekly_maintenance_window_start = "MON:10:00"
airflow_version = "2.5.1"
# S3 bucket
source_bucket_arn = local.s3_bucket_arn
dag_s3_path = "/"
}
# --------------------
# Airflow execution role
# --------------------
resource "aws_iam_policy" "execution_role" {
name = "demo-airflow-execution-role"
path = "/"
description = "Policy for demo-airflow execution role"
policy = jsonencode({
"Version" : "2012-10-17"
"Statement" : [
{
"Sid" : "AllowAirflowPublishMetrics"
"Effect" : "Allow",
"Action" : "airflow:PublishMetrics",
"Resource" : "arn:aws:airflow:ap-northeast-1:XXXXXXXXXX:environment/demo-airflow"
},
{
"Sid" : "DenyS3ListAllMyBuckets",
"Effect" : "Deny",
"Action" : "s3:ListAllMyBuckets",
"Resource" : [ local.s3_bucket_arn, "${local.s3_bucket_arn}/*" ]
},
{
"Sid" : "AllowGetBucketObjects",
"Effect" : "Allow",
"Action" : [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
"Resource" : [ local.s3_bucket_arn, "${local.s3_bucket_arn}/*" ]
},
{
"Sid" : "AllowCloudWatchLog",
"Effect" : "Allow",
"Action" : [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults"
],
"Resource" : "arn:aws:logs:ap-northeast-1:XXXXXXXXXX:log-group:airflow-demo-airflow-*"
},
{
"Sid" : "AllowCloudWatchLogGroupsDescription",
"Effect" : "Allow",
"Action" : "logs:DescribeLogGroups",
"Resource" : "*"
},
{
"Sid" : "ALlowCloudWatchPutMetrics",
"Effect" : "Allow",
"Action" : "cloudwatch:PutMetricData",
"Resource" : "*"
},
{
"Sid" : "AllowAirflowCelerySQS",
"Effect" : "Allow",
"Action" : [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource" : "arn:aws:sqs:ap-northeast-1:*:airflow-celery-*"
},
{
"Sid" : "AllowKMS",
"Effect" : "Allow",
"Action" : [
"kms:Decrypt",
"kms:DescribeKey",
"kms:GenerateDataKey*",
"kms:Encrypt"
],
"Condition" : {
"StringLike" : {
"kms:ViaService" : [
"sqs.ap-northeast-1.amazonaws.com"
]
}
},
"NotResource" : "arn:aws:kms:*:XXXXXXXXXX:key/*"
},
{
"Sid" : "ALlowEKSClusterAccess",
"Effect" : "Allow",
"Action" : "eks:DescribeCluster",
"Resource" : local.eks_cluster_arns
}
]
})
}
resource "aws_iam_role" "execution_role" {
name = "demo-airflow-execution-role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"airflow.amazonaws.com",
"airflow-env.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "execution_role" {
role = aws_iam_role.execution_role.name
policy_arn = aws_iam_policy.execution_role.arn
}
AirflowのKubernetesPodOperatorタスクを利用することでKubernetesのPodを作成できます。重い処理をPodにオフロードすることでAirflowのWorkerの負荷を下げることはもちろん、SREグループとしては既に運用しているKubernetes基盤をそのまま活かせるのが最大のメリットです(ログ収集や監視仕組み、コンテナ化、シークレット管理など)。
PodをAirflowで作成する際、コンテナのDockerイメージやメモリリソースなど、そのPodのスペックをKubernetesPodOperatorに渡す必要があります。渡し方が主に2つあります:インラインで引数として渡すか、PodのスペックをPodTemplate YAMLファイルに定義しファイルパスを指定するか。
from datetime import datetime
from airflow import models # do not remove
default_args = {
'provide_context': True
}
with models.DAG(
dag_id='inline_pod',
default_args=default_args,
schedule_interval=None,
) as dag:
# タスク
task1 = KubernetesPodOperator(
dag=dag,
task_id="task1",
# k8s settings
in_cluster=True,
config_file="kube_config.yaml",
# pod settings
namespace="default",
image="alpine:3.18.0",
image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")],
cmds=["sh", "-c", "echo 'task1'; sleep 5; echo 'end'"],
labels={"foo": "bar"},
# lifecycle
is_delete_operator_pod=True,
get_logs=True,
)
task2 = KubernetesPodOperator(
dag=dag,
task_id="task2",
# k8s settings
in_cluster=True,
config_file="kube_config.yaml",
# pod settings
namespace="default",
image="alpine:3.18.0",
image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")],
cmds=["sh", "-c", "echo 'task2'; sleep 5; echo 'end'"],
labels={"foo": "bar"},
# lifecycle
is_delete_operator_pod=True,
get_logs=True,
)
# 依存関係
task1 >> task2
pod-template.yaml ファイル
apiVersion: v1
kind: Pod
metadata:
namespace: default
labels:
foo: bar
spec:
containers:
- name: base
image: alpine:3.18.0
command: ["sh", "-c", "echo 'Default'"]
resources:
limits:
memory: 32Mi
requests:
cpu: 20m
memory: 32Mi
imagePullSecrets:
- name: my-secret
DAGファイル
from datetime import datetime
from airflow import models # do not remove
default_args = {
'provide_context': True
}
with models.DAG(
dag_id='template_pod',
default_args=default_args,
schedule_interval=None,
) as dag:
# タスク
task1 = KubernetesPodOperator(
dag=dag,
task_id="task1",
# k8s settings
in_cluster=True,
config_file="kube_config.yaml",
# pod settings
pod_template_file="pod-template.yaml", # テンプレートファイル指定
cmds=["sh", "-c", "echo 'task1'; sleep 5; echo 'end'"], # cmdのみを上書き
# lifecycle
is_delete_operator_pod=True,
get_logs=True,
)
task2 = KubernetesPodOperator(
dag=dag,
task_id="task2",
# k8s settings
in_cluster=True,
config_file="kube_config.yaml",
# pod settings
pod_template_file="pod-template.yaml", # テンプレートファイル指定
cmds=["sh", "-c", "echo 'task2'; sleep 5; echo 'end'"], # cmdのみを上書き
# lifecycle
is_delete_operator_pod=True,
get_logs=True,
)
# 依存関係
task1 >> task2
インライン方法の方がシンプルですがPodのスペックが大きくなるとDAGが読みづらくなるのと、例えばDockerイメージタグのみを変更したい場合はスペックが別ファイルに分かれていた方が運用がしやすいでしょう。
Airflowを本格的に利用する前に取り組まないといけない課題がいくつかありました。
Airflowのインテグレーションが多くて、PythonコードでWorkerからデータベースに直接繋いでクエリを実行したり、HTTPサービスにリクエストを直接送ったりすることが簡単にできてしまいます。Airflowがよく利用されている分析用途のETL処理の文脈であれば特に懸念がないですが、GOでは様々なマイクロサービスのデータを集計したり比較したり修正したりしています。Airflowからそれぞれのマイクロサービスのデータベースを直接参照すると認証情報をAirflowに保存しないといけないのと、DBスキーマが変わった時にAirflowのDAGに反映する必要があって、メンテナンスが大変になります。更に、例えばGolangで書かれた注文管理APIサービスがあったとして、GolangのAPIサーバもPythonのAirflow DAGもデータベースを参照していて、言語がバラバラで運用が複雑になります。
Airflowからデータベースを直接参照するより、Airflowで実行したい処理を注文管理サービスの一部の機能としてジョブとして定義し、Airflowから注文管理サービスのPodを作成する形の方が運用しやすいです。Airflowはあくまでもトリガーだけになり、処理自体は注文管理サービスの一部になってAPIサーバと同様に管理できます:
同様に、例えばAirflowからユーザ管理サービスにHTTPリクエストを直接送って処理させるより、ユーザ管理サービスのPodを動かして処理した方が良いです。
Airflowの様々なインテグレーションを使うよりも、Airflowを主にKubernetes Podのスケジューリングに利用しています。以下の画像では、左が様々な依存関係が生まれる直接参照の例で、右側が採用しているPod作成の例です。
SREグループではKubernetes基盤を提供していて、それぞれのアプリケーション(サービス)のAPIサーバやCron Jobなどのデプロイが自動化されています。アプリケーションリポジトリにプッシュするとAPIサーバやCron Jobなどが自動的にクラスタにデプロイされます。
Airflowから作成されるPodの管理もなるべく既存の基盤に合わせることで全体の開発体験をシームレスにすると良いでしょう。特にDAGが作成するPodのDockerイメージタグをデプロイのたびに手動で更新することは難しいです。この後解決案を紹介します。
Airflowはマルチテナントなシステムではないです。例えばAirflowのS3バケットをテナントごとフォルダ分けしアップロード権限をユーザによって最小限に設定したとしても、テナントAがアップロードしたDAGからAirflowのデータベースにアクセスし他のDAGの削除やユーザの変更ができてしまいます。
更に、Airflowを使ってKubernetes Podを作成する場合、Airflowに強いKubernetes権限を付与する必要があるため、テナントAが他のテナントのPodを作成できてしまいます。例えば:
こういう問題を厳密に解決するにはAirflow環境をテナントごとに分けるしかないですがAirflowのマネージドサービスのコスト面が気になります。例えば mw1.small サイズのAmazon MWAA環境は最低でも月額$350以上かかります。10テナントがあった場合、月額$3500以上かかってしまいます。この後解決案を紹介します。
これからSREグループで考えた方針と提供しているAirflow基盤について紹介したいと思います。
前述の課題に取り組むように以下の方針を決めています
サンプル
airflow-bucket
├── dags
│ ├── namespaceA # ネームスペースAのDAGを管理
│ │ ├── dagA-1.py
│ │ └── dagA-2.py
│ └── namespaceB # ネームスペースBのDAGを管理
│ ├── dagB-1.py
│ ├── dagB-2.py
│ └── dagB-3.py
└── templates
├── namespaceA
│ ├── application1 # アプリケーション1のPodTemplateを管理
│ │ └── pod-application1-template1.yaml
│ └── application2 # アプリケーション2のPodTemplateを管理
│ └── pod-application2-template1.yaml
└── namespaceB
└── application3 # アプリケーション3のPodTemplateを管理
├── pod-application3-template1.yaml
└── pod-application3-template2.yaml
Airflowを利用しやすくするため、DAGをネームスペースごとのgitリポジトリで管理し、gitフローで運用しています。リポジトリにPRを出してマージすると、CIによる自動デプロイが走りS3バケットにアップロードされます。
エンジニアが慣れているgitフローで運用することでAirflowへの反映方法を気にしてなくても良くて自然な開発体験になります:
自動デプロイすることでエンジニアにS3権限を付与する必要がないのと、GitHubのCODEOWNERSとブランチ保護ルールを設定することで本番環境へのデプロイの際にSREメンバーのレビューを必須にしていてDAG内容の確認ができます。
SREが提供しているKubernetes基盤ではアプリケーションリポジトリにプッシュすると、
Airflowから作成されるPodも同様にデプロイできるように以下の方針にしています:
そうすることで、アプリケーションリポジトリにプッシュすることだけでAPIサーバもCron JobもAirflowのPodTemplateもデプロイされます。リバートする必要があった場合には、APIサーバもCron JobもAirflowのPodTemplateもリバートされます。DAG管理と同様に、エンジニアが慣れているgitフローで運用すれば良くて、Airflowのことを気にする必要がないです。
Kubernetesクラスタへのデプロイ方法、DAGのデプロイ方法とPodTemplateのデプロイ方法をまとめた全体図になります。
テナントごとのAirflow環境ではなく、一つの環境だけを用意しマルチテナントと近い形にすることで料金を大幅に抑えています。以下の方針でセキュリティが担保されています。
DAGを本番環境にデプロイする際にSREレビューが必須になるため、Airflowデータベースを直接参照したり別のネームスペースのPodを作成したりすることを防げます。レビュー不足で他のテナントに影響を与える可能性が残っていますがリスクを抑えています。
DAGのデプロイもPodTemplateのデプロイも自動化されているため、エンジニアに権限を付与する必要がないのと、CIのIAM権限を最小限にできます。
慣れているgitフローで運用することだけでデプロイやリバートなどができますのでAirflowの利用がかなり楽になっています。
DAG管理:
アプリケーションコード管理:
PodTemplate管理:
DAGを本番環境にデプロイするためにSREメンバーのレビューが必須になるため、開発者の作業が止まる可能性があります。この辺りが唯一妥協しているポイントになります。Airflowは将来的にマルチテナントになるようで、今後のリリースに期待しています!
AirflowとKubernetesの相性を活かしたシームレスな開発体験を提供するAirflow基盤を用意しました。Airflowのマルチテナンシー課題を考慮しセキュリティを妥協しないマルチテナントと近い形の基盤に至りました。Amazon MWAAを利用していますがGCP Cloud Composerなど他のAirflowサービスにも適用できるためこの記事がご参考になれば幸いです!
興味のある方は 採用ページ も見ていただけると嬉しいです。
Twitter @goinc_techtalk のフォローもよろしくお願いします!