はじめに
AWSサービスのジョブ管理用サービスに、MWAA(Managed Workflows for Apache Airflow)というものがあります。
理解のために触ってみました。
前回はDAGファイルの中でbashコマンドを実行しただけでしたが、将来的にもう少し複雑な処理をおこなうことを想定し、Pythonファイルをインポートして実行してみます。
前回の記事
前提
前回の記事の実施内容を完了していることが前提です。
- AWSのアカウントがあること。
- MWAAの実行環境を作成していること。
- DAGファイルを作成し、S3の専用ディレクトリ(
airflow-〇〇〇/ dags/
)に保存できていること。
事前準備:Pythonプログラムを作成する
ちゃちゃっと動作確認
Paiza.ioというページで、ブラウザ上でPythonを動かしてみることができます。
以下のコードを書いて、実行できることを確かめます。Pythonを触ったことが無いので、まずはこれくらいで。。
def my_function(x):
return x + "でPythonファイルを動かしてみるよ"
print(my_function("Apatch Airflow"))
my_function
という関数を定義し、print
関数内で呼び出しています。
.pyファイル作成
関数を定義するファイルを作成します。関数は先ほどのものを利用します。
def my_function(x):
return x + "でPythonファイルを動かしてみるよ"
このファイルは、S3のDAG定義ファイルと同じ場所airflow-〇〇〇/dags/
に置いておきます。
DAGファイル作成
前回作成したDAGファイルを流用して、新たにワークフロー(third_dag
)を作ります。
追加した部分をハイライトしています。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum
from airflow.operators.python_operator import PythonOperatorfrom define_function import my_function
# DAGを作る
default_args = {
'owner': 'xxx',
'depends_on_past': False,
'start_date': datetime(2022,4,5,1,0,tzinfo=pendulum.timezone('Asia/Tokyo')),
'schedule_interval': timedelta(minutes=5),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('third_dag', default_args=default_args)
# DAGと紐づくタスクを作る
t1 = BashOperator(
task_id='t1',
bash_command='echo t1',
dag=dag)
t2 = BashOperator(
task_id='t2',
bash_command='echo t2dayo',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='t3',
bash_command='echo "{{ params.greeting }}"',
params={'greeting': 'Hello, AirFlow!'},
dag=dag)
t4 = BashOperator(
task_id='t4',
bash_command='echo t4',
dag=dag
)
t5 = PythonOperator( task_id='t5_print', python_callable= my_function, op_kwargs = {"x" : "Apache Airflow"}, dag=dag,)
# タスク間に依存関係を定義する
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
t3 >> t5
インポート
DAGファイルと同じ場所にdefilne_function.py
を置いているので、以下のようにインポートできます。
from define_function import my_function
タスク定義
新たにt5
タスクを定義します。
python_callable
に関数名を入れます。
op_kwargs
にパラメータを設定します。
t5 = PythonOperator(
task_id='t5_print',
python_callable= my_function,
op_kwargs = {"x" : "Apache Airflow"},
dag=dag,
)
依存関係の定義
t3
の後ろにt5
を定義します。
t3 >> t5
set_upstream
は、PythonOperator
では使えないようです。
DAGファイルアプロード
S3のairflow-〇〇〇/dags/
にhello_airflow_3.py
をアップロードします。
Airflow実行確認
管理画面の確認
Airflow管理画面を見るとthird_dag
ができていることがわかります。
実行ボタンを押してみます。
ログ確認
Graphタブから、t5_print
をクリックし、Log画面でログを確認します。
[2022-04-10, 17:36:01 JST] {{taskinstance.py:1262}} INFO - Executing <Task(PythonOperator): t5_print> on 2022-04-10 08:35:55.141404+00:00
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:52}} INFO - Started process 2805 to run task
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:52}} INFO - Started process 2805 to run task
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:76}} INFO - Running: ['airflow', 'tasks', 'run', 'third_dag', 't5_print', 'manual__2022-04-10T08:35:55.141404+00:00', '--job-id', '80', '--raw', '--subdir', 'DAGS_FOLDER/hello_airflow_3.py', '--cfg-path', '/tmp/tmp9kfs6it7', '--error-file', '/tmp/tmpbrju4ve0']
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:77}} INFO - Job 80: Subtask t5_print
[2022-04-10, 17:36:02 JST] {{base_aws.py:401}} INFO - Airflow Connection: aws_conn_id=aws_default
[2022-04-10, 17:36:02 JST] {{base_aws.py:190}} INFO - No credentials retrieved from Connection
[2022-04-10, 17:36:02 JST] {{base_aws.py:93}} INFO - Creating session with aws_access_key_id=None region_name=ap-northeast-1
[2022-04-10, 17:36:02 JST] {{base_aws.py:168}} INFO - role_arn is None
[2022-04-10, 17:36:02 JST] {{logging_mixin.py:109}} INFO - Running <TaskInstance: third_dag.t5_print manual__2022-04-10T08:35:55.141404+00:00 [running]> on host ip-10-192-21-172.ap-northeast-1.compute.internal
[2022-04-10, 17:36:02 JST] {{taskinstance.py:1429}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=xxxx
AIRFLOW_CTX_DAG_ID=third_dag
AIRFLOW_CTX_TASK_ID=t5_print
AIRFLOW_CTX_EXECUTION_DATE=2022-04-10T08:35:55.141404+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-04-10T08:35:55.141404+00:00
[2022-04-10, 17:36:02 JST] {{python.py:152}} INFO - Done. Returned value was: Apache AirflowでPythonファイルを動かしてみるよ[2022-04-10, 17:36:02 JST] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=third_dag, task_id=t5_print, execution_date=20220410T083555, start_date=20220410T083601, end_date=20220410T083602
[2022-04-10, 17:36:02 JST] {{local_task_job.py:154}} INFO - Task exited with return code 0
[2022-04-10, 17:36:03 JST] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
Apache AirflowでPythonファイルを動かしてみるよ
と出力されたことがわかります。
まとめ
MWAAで、Pythonプログラムを読み込んでワークフローに組み込み、実行することができました。
残課題
- バッチの起動時刻や間隔を意図したものにする
- まともな処理を作ってみる(twitterのAPIとか使ってみたい)
- StepFunctionやLambdaとの違いを確認したい