【MWAA】AWSのワークフロー管理システムMWAAをさわってみた。

2022/4/03

(最終更新: 2022/4/07

はじめに

AWSサービスのジョブ管理用サービスに、MWAA(Managed Workflows for Apache Airflow)というものがあります。

仕事でも必要な知識なので、理解のために触ってみました。 初めて触る人、なんとなく理解したい人に向けて共有します。

※超初心者です。

前提

  • AWSのアカウントがあること。

MWAAとは

MWAAは、Apatch AirFlowというワークフロー管理システムをAWSマネジメントサービスとして提供したものです。

  • 順序性をもつバッチ処理を、事前にスケジュールした時刻に自動起動させることができます。
  • 中にバッチ処理を書いて実行もできます。
  • AWSの他サービスと連携できます。
  • マネージドサービスなので、比較的、管理は楽(なはず)です。
  • 順序性や起動設定は、「DAG」と呼ばれるpythonファイルで記述します。
参考資料

Amazon Managed Workflows for Apache Airflow (MWAA) とは - Amazon Managed Workflows for Apache Airflow

ジョブ管理サービスというと、他には日立のJP1等があります。

処理の流れをAirFlow、MWAAで管理するには

DAG定義

AirFlowの処理の流れは、DAG(Directed Acyclic Graph)と呼ばれる有向グラフで表されます。MWAAでも同様です。

このようなイメージです。

DAGイメージ

ー引用元:

DAGs — Airflow Documentation

aの処理が終わったらbとcの処理が動き出し、終わったらdの処理が動きます。

また、AirFlowではこれらの処理を、CRONのように「毎日何時から起動」とか「何分おきに起動」などと、スケジューリングすることができます。

DAGの定義は、Pythonで記述します。

実際にMWAAで処理を動かしてみる

MWAA環境の作成

まずはMWAA環境を構築してみます。

マネジメントコンソールから「MWAA」を検索します。

マネジメントコンソール

「環境を作成」を選択し、環境の詳細を設定します。

環境の詳細を設定する。

ここで、DAG定義ファイルが置いてあるS3の場所を指定する必要があるので、いったんS3バケットとDAG定義ファイルの作成に移ります。

S3バケット作成

S3バケットを作成します。

ここで、バケット名はairflow-〇〇〇というようにairflow-で始まるものにする必要があります。

S3バケット作成

パブリックアクセスはブロックする必要があります。

また、バージョニングは有効にする必要があります。

S3バケットの設定

そのほかは特別な設定は必要ないので、「バケットを作成」を実行します。

次に、作ったバケットの中にdagsフォルダを作成します。

dagsフォルダ

MWAA環境へのS3設定

S3バケットにairflow-〇〇〇を、DAGフォルダにairflow-〇〇〇/dagsを設定します。

S3設定

MWAA用VPCを作成

VPCを選ぶ必要があるので、ここでは「MWAA VPCを作成」を選びます。

既存のVPCを使おうかと思いましたが、VPCにもいろいろと条件があるようなので、ここではVPCを作成してしまいます。

CloudFormationの画面に飛ぶので、そのまま作成します。 CloudFormation画面

既存のVPCでやった場合

既存のVPCでやってみようと思いましたが、MWAA環境自体が「作成中」のまま止まってしまい上手くいきませんでした。

エラーの内容は以下。

You may need to check the execution role permissions policy for your environment, and that each of the VPC networking components required by the environment are configured to allow traffic. Troubleshooting:

既存VPCに作った仕組みと連携したい場合は、MWAAも既存のVPCに作る必要があるかもしれないので、実施するときは工夫が必要そうです。(ここは勉強不足)

MWAA環境のネットワークを設定

VPCが作成できたら、VPCを選択します。

サブネットは、AZ-1aとAZ-1cのプライベートサブネットをそれぞれ選びます。

ネットワークは「公開ネットワーク」を選びます。

セキュリティグループは新しく作成します。

MWAAのネットワーク設定

その他設定

IAMロールは新しいものを作成します。

IAMロール

他の設定はそのままとします。

DAG定義を作成してアップロード

DAGの定義を作成します。

こちらのページのものをほぼそのまま使わせていただきました。

dags/hello_airflow.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# DAGを作る
default_args = {
    'owner': 'myname',
    'depends_on_past': False,
    'start_date': datetime(2022, 4, 3, 10, 0, 0),
    'schedule_interval': timedelta(minutes=3),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('first_dag', default_args=default_args)

# DAGと紐づくタスクを作る
t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='echo t2',
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.greeting }}"',
    params={'greeting': 'Hello, AirFlow!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4', 
    dag=dag
)

# タスク間に依存関係を定義する
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])

これをS3のdags/フォルダにアップロードします。

DAG定義の中身

default_argsでは、各種パラメータを定義しています。

  • start_date:スケジューリングの開始時刻
  • schedule_interval:実行間隔
  • retries:リトライ回数
  • retry_delay:リトライ間隔

t1t4では、bash_commandのパラメータでbashコマンドを定義しています。

最後は各タスクの依存関係を定義しています。上記のようにset_upstreamを使うか、以下のように記載します。

t1 >> [t2, t3]

処理実行確認

AirFlowの管理画面で処理を確認する

MWAA環境完成が完成したので、AirFlowの管理画面(AirFlow UI)を開いてみます。

MWAA環境完成

DAG定義の一覧が表示されています。先ほど作成したfirst_dagが存在します。

DAG定義一覧

左側のトグルをONにして有効化します。

手動実行してみる

▶ボタン(Actions)を押して「Trigger DAG」を選択し、手動実行してみます。

first_dagの中身を見てみます。

Treeタブ 処理結果(Tree)

Graphタブ 処理結果(Graph)

いずれも緑になっているので、成功しているように見えます。

試しにGraphタブの「t4」をクリックして、「Log」を見てみます。

t4のLog

以下のようなログが出ています。

ログの中身

t4のログ
[2022-04-04, 15:13:06 UTC] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'echo t4']
[2022-04-04, 15:13:06 UTC] {{subprocess.py:85}} INFO - Output:
[2022-04-04, 15:13:06 UTC] {{subprocess.py:89}} INFO - t4[2022-04-04, 15:13:07 UTC] {{subprocess.py:93}} INFO - Command exited with return code 0
[2022-04-04, 15:13:07 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=first_dag, task_id=t4, execution_date=20220404T151257, start_date=20220404T151305, end_date=20220404T151307
[2022-04-04, 15:13:07 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
[2022-04-04, 15:13:07 UTC] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

bashコマンドで「t4」という文字列を出力する処理を入れていましたが、ログにも

t4

という値が出ていることがわかります。

エラーが出たらどうなるのか

DAG定義ファイルを書き換えて、t2のタスクがエラーとなるようにします。

dags/hello_airflow.py
t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

S3にアップロードすると、すぐにMWAAの定義も変わります。

MWAA定義

手動実行すると、t2が赤くエラーとなり、t4が黄色く待ち状態になっていることがわかります。

Treeタブ エラー処理結果(Tree)

Graphタブ エラー処理結果(Graph)

t2タスクのログを見てみると、

[2022-04-04, 15:29:27 UTC] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'exit 1']
[2022-04-04, 15:29:27 UTC] {{subprocess.py:85}} INFO - Output:
[2022-04-04, 15:29:27 UTC] {{subprocess.py:93}} INFO - Command exited with return code 1
[2022-04-04, 15:29:27 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):

コード値1で値が返却され、タスクがエラーになっていることがわかります。

その後、

[2022-04-04, 15:29:27 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=first_dag, task_id=t2, execution_date=20220404T152615, start_date=20220404T152926, end_date=20220404T152927
[2022-04-04, 15:29:27 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 16 for task t2

Marking task as FAILED first_dag自体が失敗したよ!というログが発生しています。

同じようなログが4つ出ているので、DAGファイルで設定した通り、3回リトライしているのだと思います。

ログが4件発生

待たされて終わったt4には、何もログが出ていません。

t4のログ

スケジューリングしておいて自動起動する

first_dagは手動で起動してしまったので、同じ設定のsecond_dagを作成してそのまま放置しました。

DAG一覧

自動で実行されたことがわかります。

3分置きになんども実行してくれると思ったのですが、1日1回起動という動きになっています。(時刻も謎)

継続して確認したいと思います。

まとめ

  • MWAAの実行環境を作成した。
  • MWAAの実行を定義するDAGファイルを作成した。
  • 実際にバッチ処理が動くことを確認した。

残課題

  • バッチの起動時刻や間隔を意図したものにする
  • echoだけではなく何か他のシステムと連携した処理を作ってみる

関連記事

https://bunsugi.com/mwaa-python-import

コメントはこちら



個別連絡はこちらへ→Twitterお問い合わせ

プロフィール

プロフィールイメージ

はち子

事業会社のシステム部門に異動して4年目の会社員。システム企画/要件定義/システムアーキテクチャ等。

Twitter→@bun_sugi

過去の記事について

はてなブログに掲載の記事(主にプログラミングメモ)についてはこちらに掲載しております。(本ブログに移行中)

タグ一覧

関連記事

Copyright© 2022, エンジニアを目指す日常ブログ

お問い合わせ|プライバシーポリシー