【AWS】MWAAでPythonプログラムをスケジュール実行する。

2022/4/10

(最終更新: 2022/4/10

はじめに

AWSサービスのジョブ管理用サービスに、MWAA(Managed Workflows for Apache Airflow)というものがあります。

理解のために触ってみました。

前回はDAGファイルの中でbashコマンドを実行しただけでしたが、将来的にもう少し複雑な処理をおこなうことを想定し、Pythonファイルをインポートして実行してみます。

前回の記事

【MWAA】AWSのワークフロー管理システムMWAAをさわってみた。

【MWAA】AWSのワークフロー管理システムMWAAをさわってみた。

AWSサービスのMWAA(Managed Workflows for Apache Airflow)を初めて使ってみます。MWAAは、Apatch AirFlowというワークフロー管理システムをAWSマネジメントサービスとして提供したものです。

https://bunsugi.com/mwaa-beginner

前提

前回の記事の実施内容を完了していることが前提です。

  • AWSのアカウントがあること。
  • MWAAの実行環境を作成していること。
  • DAGファイルを作成し、S3の専用ディレクトリ(airflow-〇〇〇/ dags/)に保存できていること。

事前準備:Pythonプログラムを作成する

ちゃちゃっと動作確認

Paiza.ioというページで、ブラウザ上でPythonを動かしてみることができます。

以下のコードを書いて、実行できることを確かめます。Pythonを触ったことが無いので、まずはこれくらいで。。

def my_function(x):
    return x + "でPythonファイルを動かしてみるよ"  
    
print(my_function("Apatch Airflow"))

my_functionという関数を定義し、print関数内で呼び出しています。

Python簡易プログラム実行結果

.pyファイル作成

関数を定義するファイルを作成します。関数は先ほどのものを利用します。

define_function.py
def my_function(x):
    return x + "でPythonファイルを動かしてみるよ"

このファイルは、S3のDAG定義ファイルと同じ場所airflow-〇〇〇/dags/に置いておきます。

DAGファイル作成

前回作成したDAGファイルを流用して、新たにワークフロー(third_dag)を作ります。

追加した部分をハイライトしています。

hello_airflow_3.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum
from airflow.operators.python_operator import PythonOperatorfrom define_function import my_function
# DAGを作る
default_args = {
    'owner': 'xxx',
    'depends_on_past': False,
    'start_date': datetime(2022,4,5,1,0,tzinfo=pendulum.timezone('Asia/Tokyo')),
    'schedule_interval': timedelta(minutes=5),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('third_dag', default_args=default_args)
# DAGと紐づくタスクを作る
t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='echo t2dayo',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.greeting }}"',
    params={'greeting': 'Hello, AirFlow!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4', 
    dag=dag
)

t5 = PythonOperator(    task_id='t5_print',    python_callable= my_function,    op_kwargs = {"x" : "Apache Airflow"},    dag=dag,)

# タスク間に依存関係を定義する
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
t3 >> t5

インポート

DAGファイルと同じ場所にdefilne_function.pyを置いているので、以下のようにインポートできます。

from define_function import my_function

タスク定義

新たにt5タスクを定義します。

python_callableに関数名を入れます。

op_kwargsにパラメータを設定します。

t5 = PythonOperator(
    task_id='t5_print',
    python_callable= my_function,
    op_kwargs = {"x" : "Apache Airflow"},
    dag=dag,
)

依存関係の定義

t3の後ろにt5を定義します。

t3 >> t5

set_upstreamは、PythonOperatorでは使えないようです。

DAGファイルアプロード

S3のairflow-〇〇〇/dags/hello_airflow_3.pyをアップロードします。

Airflow実行確認

管理画面の確認

Airflow管理画面を見るとthird_dagができていることがわかります。

Airflow管理画面

実行ボタンを押してみます。

Graphタブを見ると、実行が成功したことが確認できます。 実行結果(Graphタブ)

ログ確認

Graphタブから、t5_printをクリックし、Log画面でログを確認します。

[2022-04-10, 17:36:01 JST] {{taskinstance.py:1262}} INFO - Executing <Task(PythonOperator): t5_print> on 2022-04-10 08:35:55.141404+00:00
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:52}} INFO - Started process 2805 to run task
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:52}} INFO - Started process 2805 to run task
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:76}} INFO - Running: ['airflow', 'tasks', 'run', 'third_dag', 't5_print', 'manual__2022-04-10T08:35:55.141404+00:00', '--job-id', '80', '--raw', '--subdir', 'DAGS_FOLDER/hello_airflow_3.py', '--cfg-path', '/tmp/tmp9kfs6it7', '--error-file', '/tmp/tmpbrju4ve0']
[2022-04-10, 17:36:01 JST] {{standard_task_runner.py:77}} INFO - Job 80: Subtask t5_print
[2022-04-10, 17:36:02 JST] {{base_aws.py:401}} INFO - Airflow Connection: aws_conn_id=aws_default
[2022-04-10, 17:36:02 JST] {{base_aws.py:190}} INFO - No credentials retrieved from Connection
[2022-04-10, 17:36:02 JST] {{base_aws.py:93}} INFO - Creating session with aws_access_key_id=None region_name=ap-northeast-1
[2022-04-10, 17:36:02 JST] {{base_aws.py:168}} INFO - role_arn is None
[2022-04-10, 17:36:02 JST] {{logging_mixin.py:109}} INFO - Running <TaskInstance: third_dag.t5_print manual__2022-04-10T08:35:55.141404+00:00 [running]> on host ip-10-192-21-172.ap-northeast-1.compute.internal
[2022-04-10, 17:36:02 JST] {{taskinstance.py:1429}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=xxxx
AIRFLOW_CTX_DAG_ID=third_dag
AIRFLOW_CTX_TASK_ID=t5_print
AIRFLOW_CTX_EXECUTION_DATE=2022-04-10T08:35:55.141404+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-04-10T08:35:55.141404+00:00
[2022-04-10, 17:36:02 JST] {{python.py:152}} INFO - Done. Returned value was: Apache AirflowでPythonファイルを動かしてみるよ[2022-04-10, 17:36:02 JST] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=third_dag, task_id=t5_print, execution_date=20220410T083555, start_date=20220410T083601, end_date=20220410T083602
[2022-04-10, 17:36:02 JST] {{local_task_job.py:154}} INFO - Task exited with return code 0
[2022-04-10, 17:36:03 JST] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

Apache AirflowでPythonファイルを動かしてみるよと出力されたことがわかります。

まとめ

MWAAで、Pythonプログラムを読み込んでワークフローに組み込み、実行することができました。

残課題

  • バッチの起動時刻や間隔を意図したものにする
  • まともな処理を作ってみる(twitterのAPIとか使ってみたい)
  • StepFunctionやLambdaとの違いを確認したい

参考資料



個別連絡はこちらへ→Twitterお問い合わせ

プロフィール

プロフィールイメージ

はち子

事業会社のシステム部門で働く会社員。→転職してビジネス部門でシステム関連の業務を行っています。プロダクトマネージャー/システム企画/要件定義/システムアーキテクチャ等。

Twitter→@bun_sugi

過去の記事について

はてなブログに掲載の記事(主にプログラミングメモ)についてはこちらに掲載しております。(本ブログに移行中)

タグ一覧

関連記事

Copyright© 2025, エンジニアを目指す日常ブログ

お問い合わせ|プライバシーポリシー