MoTLab -GO Inc. Engineering Blog-MoTLab -GO Inc. Engineering Blog-

シームレスなAirflowワークフロー基盤

SRE
May 31, 2023

こんにちは、SREグループのカンタンです!GO株式会社では複雑なワークフローを実現するためにAirflowを利用しています。Kubernetesを活かして、マルチテナンシーによるセキュリティ課題を解決しシームレスな開発体験を提供するAirflow基盤を用意しました。今回はその基盤をご紹介します。


Airflow

Airflowとはワークフローを管理するためのプラットフォームです。実行したいタスクとその依存関係を一つのDAG (DAG = Directed Acyclic Graph = 有向非巡回グラフ) としてPythonファイルで定義すると、その依存関係に従ってタスクが実行されます。更に、タスクの状態、履歴やログなどをGUIで確認できます。

Kubernetes、AWS S3、BigQuery、SQL DBなど様々なインテグレーションが存在していて機能も豊富なため非常に人気があります。

An image from Notion

bashを使って hello を出力するタスクとPythonで airflow を出力するタスクのDAGサンプル:

from datetimeimport datetime
from airflowimport DAG
from airflow.decoratorsimport task
from airflow.operators.bashimport BashOperator

# DAG = ワークフロー
with DAG(
	dag_id="demo",
	start_date=datetime(2022, 1, 1),
	schedule="0 0 * * *",
) as dag:
	
		# Operator = タスク
		# bashタスク
		hello = BashOperator(task_id="hello", bash_command="echo hello")

		# Pythonタスク
    @task()
		def airflow():
        print("airflow")

		# タスクの依存関係を定義
		hello >> airflow()

Amazon MWAA

SREグループはAWSを利用することが多いため、Amazon Managed Workflows for Apache Airflow (Amazon MWAA)というマネージドAirflowサービスを利用しています。

VPCとS3バケットを用意すればAmazon MWAA環境が作れます。以下のようなアーキテクチャになります:

  • Service VPC
    • AWS管理のVPC
    • AirflowのデータベースやWeb Serverなどが動いている
  • Customer VPC
    • 環境を作るために用意するVPC
    • AirflowのSchedulersとWorkersが動いている
  • S3バケット (DAG Registry)
    • 環境を作るために用意するS3バケット
    • Airflowが利用するデータの保存場所。DAGのPythonファイルをそのバケットにアップロードすることでAirflowに反映できる

An image from Notion

AirflowのSchedulersがワークフローとタスクのスケジューリングを行い、Workersがタスクを実際に実行しています。タスクの実行方法がタスクの種類によります:BashやPythonタスクはWorkersに直接実行されますが、例えばAirflowのKubernetesPodOperatorタスクを利用するとWorkersがKubernetesクラスタ上のPodを作成するため主な処理をPodにオフロードできます。

Amazon MWAA環境を作成するためのサンプルTerraform設定:


locals {
  s3_bucket_arn      = "..."
  security_group_ids = [...]
  subnet_ids         = [...]
  eks_cluster_arns   = [...]
}

# --------------------
# Amazon MWAA (Amazon Managed Workflows for Apache Airflow)
# --------------------
resource "aws_mwaa_environment" "this" {
  name = "demo-airflow"

  # Network
  network_configuration {
    security_group_ids = local.security_group_ids
    subnet_ids         = local.subnet_ids
  }
  webserver_access_mode = "PUBLIC_ONLY"

  # Environment
  environment_class               = "mw1.small"
  execution_role_arn              = aws_iam_role.execution_role.arn
  min_workers                     = 1
  max_workers                     = 10
  schedulers                      = 2
  weekly_maintenance_window_start = "MON:10:00"
  airflow_version                 = "2.5.1"
  
  # S3 bucket
  source_bucket_arn = local.s3_bucket_arn
  dag_s3_path       = "/"
}

# --------------------
# Airflow execution role
# --------------------
resource "aws_iam_policy" "execution_role" {
  name        = "demo-airflow-execution-role"
  path        = "/"
  description = "Policy for demo-airflow execution role"

  policy = jsonencode({
    "Version" : "2012-10-17"
    "Statement" : [
      {
        "Sid" : "AllowAirflowPublishMetrics"
        "Effect" : "Allow",
        "Action" : "airflow:PublishMetrics",
        "Resource" : "arn:aws:airflow:ap-northeast-1:XXXXXXXXXX:environment/demo-airflow"
      },
      {
        "Sid" : "DenyS3ListAllMyBuckets",
        "Effect" : "Deny",
        "Action" : "s3:ListAllMyBuckets",
        "Resource" : [ local.s3_bucket_arn, "${local.s3_bucket_arn}/*" ]
      },
      {
        "Sid" : "AllowGetBucketObjects",
        "Effect" : "Allow",
        "Action" : [
          "s3:GetObject*",
          "s3:GetBucket*",
          "s3:List*"
        ],
        "Resource" : [ local.s3_bucket_arn, "${local.s3_bucket_arn}/*" ]
      },
      {
        "Sid" : "AllowCloudWatchLog",
        "Effect" : "Allow",
        "Action" : [
          "logs:CreateLogStream",
          "logs:CreateLogGroup",
          "logs:PutLogEvents",
          "logs:GetLogEvents",
          "logs:GetLogRecord",
          "logs:GetLogGroupFields",
          "logs:GetQueryResults"
        ],
        "Resource" : "arn:aws:logs:ap-northeast-1:XXXXXXXXXX:log-group:airflow-demo-airflow-*"
      },
      {
        "Sid" : "AllowCloudWatchLogGroupsDescription",
        "Effect" : "Allow",
        "Action" : "logs:DescribeLogGroups",
        "Resource" : "*"
      },
      {
        "Sid" : "ALlowCloudWatchPutMetrics",
        "Effect" : "Allow",
        "Action" : "cloudwatch:PutMetricData",
        "Resource" : "*"
      },
      {
        "Sid" : "AllowAirflowCelerySQS",
        "Effect" : "Allow",
        "Action" : [
          "sqs:ChangeMessageVisibility",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes",
          "sqs:GetQueueUrl",
          "sqs:ReceiveMessage",
          "sqs:SendMessage"
        ],
        "Resource" : "arn:aws:sqs:ap-northeast-1:*:airflow-celery-*"
      },
      {
        "Sid" : "AllowKMS",
        "Effect" : "Allow",
        "Action" : [
          "kms:Decrypt",
          "kms:DescribeKey",
          "kms:GenerateDataKey*",
          "kms:Encrypt"
        ],
        "Condition" : {
          "StringLike" : {
            "kms:ViaService" : [
              "sqs.ap-northeast-1.amazonaws.com"
            ]
          }
        },
        "NotResource" : "arn:aws:kms:*:XXXXXXXXXX:key/*"
      },
      {
        "Sid" : "ALlowEKSClusterAccess",
        "Effect" : "Allow",
        "Action" : "eks:DescribeCluster",
        "Resource" : local.eks_cluster_arns
      }
    ]
  })
}

resource "aws_iam_role" "execution_role" {
  name = "demo-airflow-execution-role"

  assume_role_policy = <<EOF
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "airflow.amazonaws.com",
                    "airflow-env.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
EOF
}

resource "aws_iam_role_policy_attachment" "execution_role" {
  role       = aws_iam_role.execution_role.name
  policy_arn = aws_iam_policy.execution_role.arn
}

Kubernetes連携

AirflowのKubernetesPodOperatorタスクを利用することでKubernetesのPodを作成できます。重い処理をPodにオフロードすることでAirflowのWorkerの負荷を下げることはもちろん、SREグループとしては既に運用しているKubernetes基盤をそのまま活かせるのが最大のメリットです(ログ収集や監視仕組み、コンテナ化、シークレット管理など)。

PodをAirflowで作成する際、コンテナのDockerイメージやメモリリソースなど、そのPodのスペックをKubernetesPodOperatorに渡す必要があります。渡し方が主に2つあります:インラインで引数として渡すか、PodのスペックをPodTemplate YAMLファイルに定義しファイルパスを指定するか。

インラインでスペックを渡す場合

from datetime import datetime
from airflow import models # do not remove

default_args = {
    'provide_context': True
}

with models.DAG(
    dag_id='inline_pod',
    default_args=default_args,
    schedule_interval=None,
) as dag:
    # タスク
		task1 = KubernetesPodOperator(
				dag=dag,
				task_id="task1",
				# k8s settings
				in_cluster=True,
				config_file="kube_config.yaml",
				# pod settings
				namespace="default",
				image="alpine:3.18.0",
				image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")],
				cmds=["sh", "-c", "echo 'task1'; sleep 5; echo 'end'"],
				labels={"foo": "bar"},
				# lifecycle
				is_delete_operator_pod=True,
				get_logs=True,	
    )

		task2 = KubernetesPodOperator(
				dag=dag,
				task_id="task2",
				# k8s settings
				in_cluster=True,
				config_file="kube_config.yaml",
				# pod settings
				namespace="default",
				image="alpine:3.18.0",
				image_pull_secrets=[k8s.V1LocalObjectReference("my-secret")],
				cmds=["sh", "-c", "echo 'task2'; sleep 5; echo 'end'"],
				labels={"foo": "bar"},
				# lifecycle
				is_delete_operator_pod=True,
				get_logs=True,	
    )

    # 依存関係
    task1 >> task2

スペックをPodTemplate YAMLファイルとして定義する場合

pod-template.yaml ファイル

apiVersion: v1
kind: Pod
metadata:
  namespace: default
  labels:
    foo: bar
spec:
  containers:
    - name: base
      image: alpine:3.18.0
      command: ["sh", "-c", "echo 'Default'"]
      resources:
        limits:
          memory: 32Mi
        requests:
          cpu: 20m
          memory: 32Mi
  imagePullSecrets:
    - name: my-secret

DAGファイル

from datetime import datetime
from airflow import models # do not remove

default_args = {
    'provide_context': True
}

with models.DAG(
    dag_id='template_pod',
    default_args=default_args,
    schedule_interval=None,
) as dag:
    # タスク
		task1 = KubernetesPodOperator(
				dag=dag,
				task_id="task1",
				# k8s settings
				in_cluster=True,
				config_file="kube_config.yaml",
				# pod settings
				pod_template_file="pod-template.yaml", # テンプレートファイル指定
				cmds=["sh", "-c", "echo 'task1'; sleep 5; echo 'end'"], # cmdのみを上書き
				# lifecycle
				is_delete_operator_pod=True,
				get_logs=True,	
    )

		task2 = KubernetesPodOperator(
				dag=dag,
				task_id="task2",
				# k8s settings
				in_cluster=True,
				config_file="kube_config.yaml",
				# pod settings
				pod_template_file="pod-template.yaml", # テンプレートファイル指定
				cmds=["sh", "-c", "echo 'task2'; sleep 5; echo 'end'"], # cmdのみを上書き
				# lifecycle
				is_delete_operator_pod=True,
				get_logs=True,	
    )

    # 依存関係
    task1 >> task2

インライン方法の方がシンプルですがPodのスペックが大きくなるとDAGが読みづらくなるのと、例えばDockerイメージタグのみを変更したい場合はスペックが別ファイルに分かれていた方が運用がしやすいでしょう。

課題

Airflowを本格的に利用する前に取り組まないといけない課題がいくつかありました。

自由度が高すぎて不要な依存関係を生みやすい

Airflowのインテグレーションが多くて、PythonコードでWorkerからデータベースに直接繋いでクエリを実行したり、HTTPサービスにリクエストを直接送ったりすることが簡単にできてしまいます。Airflowがよく利用されている分析用途のETL処理の文脈であれば特に懸念がないですが、GOでは様々なマイクロサービスのデータを集計したり比較したり修正したりしています。Airflowからそれぞれのマイクロサービスのデータベースを直接参照すると認証情報をAirflowに保存しないといけないのと、DBスキーマが変わった時にAirflowのDAGに反映する必要があって、メンテナンスが大変になります。更に、例えばGolangで書かれた注文管理APIサービスがあったとして、GolangのAPIサーバもPythonのAirflow DAGもデータベースを参照していて、言語がバラバラで運用が複雑になります。

Airflowからデータベースを直接参照するより、Airflowで実行したい処理を注文管理サービスの一部の機能としてジョブとして定義し、Airflowから注文管理サービスのPodを作成する形の方が運用しやすいです。Airflowはあくまでもトリガーだけになり、処理自体は注文管理サービスの一部になってAPIサーバと同様に管理できます:

  • 同じgitリポジトリで管理
  • 同じ言語で書く
  • 同様に認証情報にアクセス
  • 同じ内部関数を使ってDBにアクセス
  • 同じDockerイメージで動く

同様に、例えばAirflowからユーザ管理サービスにHTTPリクエストを直接送って処理させるより、ユーザ管理サービスのPodを動かして処理した方が良いです。

Airflowの様々なインテグレーションを使うよりも、Airflowを主にKubernetes Podのスケジューリングに利用しています。以下の画像では、左が様々な依存関係が生まれる直接参照の例で、右側が採用しているPod作成の例です。

An image from Notion

シームレスな開発体験

SREグループではKubernetes基盤を提供していて、それぞれのアプリケーション(サービス)のAPIサーバやCron Jobなどのデプロイが自動化されています。アプリケーションリポジトリにプッシュするとAPIサーバやCron Jobなどが自動的にクラスタにデプロイされます。

An image from Notion

Airflowから作成されるPodの管理もなるべく既存の基盤に合わせることで全体の開発体験をシームレスにすると良いでしょう。特にDAGが作成するPodのDockerイメージタグをデプロイのたびに手動で更新することは難しいです。この後解決案を紹介します。

マルチテナンシー

Airflowはマルチテナントなシステムではないです。例えばAirflowのS3バケットをテナントごとフォルダ分けしアップロード権限をユーザによって最小限に設定したとしても、テナントAがアップロードしたDAGからAirflowのデータベースにアクセスし他のDAGの削除やユーザの変更ができてしまいます。

更に、Airflowを使ってKubernetes Podを作成する場合、Airflowに強いKubernetes権限を付与する必要があるため、テナントAが他のテナントのPodを作成できてしまいます。例えば:

  1. テナントAのDAGがWorkerから実行される
  2. DAGのタスクに従ってWorkerがAirflowのデータベースを直接アクセス
  3. テナントBのDAGが削除される
  4. テナントBのPodを作成してしまう
An image from Notion

こういう問題を厳密に解決するにはAirflow環境をテナントごとに分けるしかないですがAirflowのマネージドサービスのコスト面が気になります。例えば mw1.small サイズのAmazon MWAA環境は最低でも月額$350以上かかります。10テナントがあった場合、月額$3500以上かかってしまいます。この後解決案を紹介します。

SREのAirflow基盤

これからSREグループで考えた方針と提供しているAirflow基盤について紹介したいと思います。

方針

前述の課題に取り組むように以下の方針を決めています

  • Airflowは主にKubernetesのPodを作成するために利用する(データベースへの直接参照などがない)
  • Podを作成する際、スペックをPodTemplate YAMLファイルで指定する
  • AirflowのテナントをKubernetesネームスペースと一致させる
  • 本番用のAirflow環境と開発用のAirflow環境を一つずつ用意する
  • Airflow用のS3バケットのフォルダ構造を以下にする
    • dagsフォルダ:DAGをネームスペースごとにフォルダ分けし管理
    • templatesフォルダ:PodTemplateをネームスペースとアプリケーションごとにフォルダ分けし管理

サンプル

airflow-bucket
├── dags
│   ├── namespaceA # ネームスペースAのDAGを管理
│   │   ├── dagA-1.py
│   │   └── dagA-2.py
│   └── namespaceB # ネームスペースBのDAGを管理
│       ├── dagB-1.py
│       ├── dagB-2.py
│       └── dagB-3.py
└── templates
    ├── namespaceA
    │   ├── application1 # アプリケーション1のPodTemplateを管理
    │   │    └── pod-application1-template1.yaml
    │   └── application2 # アプリケーション2のPodTemplateを管理
    │        └── pod-application2-template1.yaml
    └── namespaceB
        └── application3 # アプリケーション3のPodTemplateを管理
            ├── pod-application3-template1.yaml
            └── pod-application3-template2.yaml

DAG管理

Airflowを利用しやすくするため、DAGをネームスペースごとのgitリポジトリで管理し、gitフローで運用しています。リポジトリにPRを出してマージすると、CIによる自動デプロイが走りS3バケットにアップロードされます。

エンジニアが慣れているgitフローで運用することでAirflowへの反映方法を気にしてなくても良くて自然な開発体験になります:

  • developブランチにマージすると開発環境のAirflowに反映される
  • mainブランチにマージすると本番環境のAirflowに反映される

自動デプロイすることでエンジニアにS3権限を付与する必要がないのと、GitHubのCODEOWNERSとブランチ保護ルールを設定することで本番環境へのデプロイの際にSREメンバーのレビューを必須にしていてDAG内容の確認ができます。

An image from Notion

PodTemplate管理

SREが提供しているKubernetes基盤ではアプリケーションリポジトリにプッシュすると、

  • アプリケーションのDockerイメージがビルドされ新しいタグがDocker Registryにプッシュされる
  • APIサーバやCron Jobなどがクラスタにデプロイされる

Airflowから作成されるPodも同様にデプロイできるように以下の方針にしています:

  • AirflowからPodを作成する際、PodのスペックをPodTemplate YAMLファイルとして定義する
  • PodTemplateファイルをアプリケーションリポジトリで管理する
  • アプリケーションリポジトリにプッシュすることでPodTemplateが自動的にS3にデプロイされる (ビルドされたDockerイメージタグを反映させた上で)

An image from Notion

そうすることで、アプリケーションリポジトリにプッシュすることだけでAPIサーバもCron JobもAirflowのPodTemplateもデプロイされます。リバートする必要があった場合には、APIサーバもCron JobもAirflowのPodTemplateもリバートされます。DAG管理と同様に、エンジニアが慣れているgitフローで運用すれば良くて、Airflowのことを気にする必要がないです。

全体図

Kubernetesクラスタへのデプロイ方法、DAGのデプロイ方法とPodTemplateのデプロイ方法をまとめた全体図になります。

An image from Notion

セキュリティ観点

テナントごとのAirflow環境ではなく、一つの環境だけを用意しマルチテナントと近い形にすることで料金を大幅に抑えています。以下の方針でセキュリティが担保されています。

DAGを本番環境にデプロイする際にSREレビューが必須になるため、Airflowデータベースを直接参照したり別のネームスペースのPodを作成したりすることを防げます。レビュー不足で他のテナントに影響を与える可能性が残っていますがリスクを抑えています。

DAGのデプロイもPodTemplateのデプロイも自動化されているため、エンジニアに権限を付与する必要がないのと、CIのIAM権限を最小限にできます。

開発体験

慣れているgitフローで運用することだけでデプロイやリバートなどができますのでAirflowの利用がかなり楽になっています。

DAG管理:

  • DAGがネームスペースごとの専用リポジトリで管理されているため、他のネームスペースとコンフリクトが発生しない
  • gitフローで自然にデプロイされる
  • 開発環境の場合はSREレビューが必須ではないためブロッカーにならない

アプリケーションコード管理:

  • AirflowからアプリケーションのPodを作成することで、Pythonのコードを用意する必要がなくて、アプリケーションと同じ言語、コード、Dockerイメージを利用できる

PodTemplate管理:

  • APIサーバやCron Jobなど、既存のワークロードと同じタイミングでデプロイ・リバートできる
  • gitフローで自然にデプロイされる

DAGを本番環境にデプロイするためにSREメンバーのレビューが必須になるため、開発者の作業が止まる可能性があります。この辺りが唯一妥協しているポイントになります。Airflowは将来的にマルチテナントになるようで、今後のリリースに期待しています!

終わりに

AirflowとKubernetesの相性を活かしたシームレスな開発体験を提供するAirflow基盤を用意しました。Airflowのマルチテナンシー課題を考慮しセキュリティを妥協しないマルチテナントと近い形の基盤に至りました。Amazon MWAAを利用していますがGCP Cloud Composerなど他のAirflowサービスにも適用できるためこの記事がご参考になれば幸いです!


We're Hiring!

📢
GO株式会社ではともに働くエンジニアを募集しています。

興味のある方は 採用ページ も見ていただけると嬉しいです。

Twitter @goinc_techtalk のフォローもよろしくお願いします!