はじめに
AWSサービスのジョブ管理用サービスに、MWAA(Managed Workflows for Apache Airflow)というものがあります。
仕事でも必要な知識なので、理解のために触ってみました。 初めて触る人、なんとなく理解したい人に向けて共有します。
※超初心者です。
前提
- AWSのアカウントがあること。
MWAAとは
MWAAは、Apatch AirFlowというワークフロー管理システムをAWSマネジメントサービスとして提供したものです。
- 順序性をもつバッチ処理を、事前にスケジュールした時刻に自動起動させることができます。
- 中にバッチ処理を書いて実行もできます。
- AWSの他サービスと連携できます。
- マネージドサービスなので、比較的、管理は楽(なはず)です。
- 順序性や起動設定は、「DAG」と呼ばれるpythonファイルで記述します。
Amazon Managed Workflows for Apache Airflow (MWAA) とは - Amazon Managed Workflows for Apache Airflow
ジョブ管理サービスというと、他には日立のJP1等があります。
処理の流れをAirFlow、MWAAで管理するには
DAG定義
AirFlowの処理の流れは、DAG(Directed Acyclic Graph)と呼ばれる有向グラフで表されます。MWAAでも同様です。
このようなイメージです。
ー引用元:
aの処理が終わったらbとcの処理が動き出し、終わったらdの処理が動きます。
また、AirFlowではこれらの処理を、CRONのように「毎日何時から起動」とか「何分おきに起動」などと、スケジューリングすることができます。
DAGの定義は、Pythonで記述します。
実際にMWAAで処理を動かしてみる
MWAA環境の作成
まずはMWAA環境を構築してみます。
マネジメントコンソールから「MWAA」を検索します。
「環境を作成」を選択し、環境の詳細を設定します。
ここで、DAG定義ファイルが置いてあるS3の場所を指定する必要があるので、いったんS3バケットとDAG定義ファイルの作成に移ります。
S3バケット作成
S3バケットを作成します。
ここで、バケット名はairflow-〇〇〇
というようにairflow-で始まるものにする必要があります。
パブリックアクセスはブロックする必要があります。
また、バージョニングは有効にする必要があります。
そのほかは特別な設定は必要ないので、「バケットを作成」を実行します。
次に、作ったバケットの中にdags
フォルダを作成します。
MWAA環境へのS3設定
S3バケットにairflow-〇〇〇
を、DAGフォルダにairflow-〇〇〇/dags
を設定します。
MWAA用VPCを作成
VPCを選ぶ必要があるので、ここでは「MWAA VPCを作成」を選びます。
既存のVPCを使おうかと思いましたが、VPCにもいろいろと条件があるようなので、ここではVPCを作成してしまいます。
CloudFormationの画面に飛ぶので、そのまま作成します。
既存のVPCでやった場合
既存のVPCでやってみようと思いましたが、MWAA環境自体が「作成中」のまま止まってしまい上手くいきませんでした。
エラーの内容は以下。
You may need to check the execution role permissions policy for your environment, and that each of the VPC networking components required by the environment are configured to allow traffic. Troubleshooting:
既存VPCに作った仕組みと連携したい場合は、MWAAも既存のVPCに作る必要があるかもしれないので、実施するときは工夫が必要そうです。(ここは勉強不足)
MWAA環境のネットワークを設定
VPCが作成できたら、VPCを選択します。
サブネットは、AZ-1aとAZ-1cのプライベートサブネットをそれぞれ選びます。
ネットワークは「公開ネットワーク」を選びます。
セキュリティグループは新しく作成します。
その他設定
IAMロールは新しいものを作成します。
他の設定はそのままとします。
DAG定義を作成してアップロード
DAGの定義を作成します。
こちらのページのものをほぼそのまま使わせていただきました。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# DAGを作る
default_args = {
'owner': 'myname',
'depends_on_past': False,
'start_date': datetime(2022, 4, 3, 10, 0, 0),
'schedule_interval': timedelta(minutes=3),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('first_dag', default_args=default_args)
# DAGと紐づくタスクを作る
t1 = BashOperator(
task_id='t1',
bash_command='echo t1',
dag=dag)
t2 = BashOperator(
task_id='t2',
bash_command='echo t2',
dag=dag)
t3 = BashOperator(
task_id='t3',
bash_command='echo "{{ params.greeting }}"',
params={'greeting': 'Hello, AirFlow!'},
dag=dag)
t4 = BashOperator(
task_id='t4',
bash_command='echo t4',
dag=dag
)
# タスク間に依存関係を定義する
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
これをS3のdags/
フォルダにアップロードします。
DAG定義の中身
default_args
では、各種パラメータを定義しています。
start_date
:スケジューリングの開始時刻schedule_interval
:実行間隔retries
:リトライ回数retry_delay
:リトライ間隔
t1
~t4
では、bash_command
のパラメータでbashコマンドを定義しています。
最後は各タスクの依存関係を定義しています。上記のようにset_upstream
を使うか、以下のように記載します。
t1 >> [t2, t3]
処理実行確認
AirFlowの管理画面で処理を確認する
MWAA環境完成が完成したので、AirFlowの管理画面(AirFlow UI)を開いてみます。
DAG定義の一覧が表示されています。先ほど作成したfirst_dag
が存在します。
左側のトグルをONにして有効化します。
手動実行してみる
▶ボタン(Actions)を押して「Trigger DAG」を選択し、手動実行してみます。
first_dagの中身を見てみます。
いずれも緑になっているので、成功しているように見えます。
試しにGraphタブの「t4」をクリックして、「Log」を見てみます。
以下のようなログが出ています。
[2022-04-04, 15:13:06 UTC] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'echo t4']
[2022-04-04, 15:13:06 UTC] {{subprocess.py:85}} INFO - Output:
[2022-04-04, 15:13:06 UTC] {{subprocess.py:89}} INFO - t4[2022-04-04, 15:13:07 UTC] {{subprocess.py:93}} INFO - Command exited with return code 0
[2022-04-04, 15:13:07 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=first_dag, task_id=t4, execution_date=20220404T151257, start_date=20220404T151305, end_date=20220404T151307
[2022-04-04, 15:13:07 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0
[2022-04-04, 15:13:07 UTC] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
bashコマンドで「t4」という文字列を出力する処理を入れていましたが、ログにも
t4
という値が出ていることがわかります。
エラーが出たらどうなるのか
DAG定義ファイルを書き換えて、t2のタスクがエラーとなるようにします。
t2 = BashOperator(
task_id='t2',
bash_command='exit 1',
retries=3,
dag=dag)
S3にアップロードすると、すぐにMWAAの定義も変わります。
手動実行すると、t2
が赤くエラーとなり、t4
が黄色く待ち状態になっていることがわかります。
t2タスクのログを見てみると、
[2022-04-04, 15:29:27 UTC] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'exit 1']
[2022-04-04, 15:29:27 UTC] {{subprocess.py:85}} INFO - Output:
[2022-04-04, 15:29:27 UTC] {{subprocess.py:93}} INFO - Command exited with return code 1
[2022-04-04, 15:29:27 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception
Traceback (most recent call last):
コード値1
で値が返却され、タスクがエラーになっていることがわかります。
その後、
[2022-04-04, 15:29:27 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=first_dag, task_id=t2, execution_date=20220404T152615, start_date=20220404T152926, end_date=20220404T152927
[2022-04-04, 15:29:27 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 16 for task t2
Marking task as FAILED
first_dag自体が失敗したよ!というログが発生しています。
同じようなログが4つ出ているので、DAGファイルで設定した通り、3回リトライしているのだと思います。
待たされて終わったt4
には、何もログが出ていません。
スケジューリングしておいて自動起動する
first_dagは手動で起動してしまったので、同じ設定のsecond_dagを作成してそのまま放置しました。
自動で実行されたことがわかります。
3分置きになんども実行してくれると思ったのですが、1日1回起動という動きになっています。(時刻も謎)
継続して確認したいと思います。
まとめ
- MWAAの実行環境を作成した。
- MWAAの実行を定義するDAGファイルを作成した。
- 実際にバッチ処理が動くことを確認した。
残課題
- バッチの起動時刻や間隔を意図したものにする
echo
だけではなく何か他のシステムと連携した処理を作ってみる
関連記事
コメントはこちら
AWSのワークフロー管理ツールMWAAを触ってみた記事を更新しました。MWAAは、Apatch AirFlowというワークフロー管理システムをAWSマネジメントサービスとして提供したものです。https://t.co/ZS7IQQbEE3#ブログ初心者
— ぶん (@bun_sugi) April 10, 2022