Apache Airflow DAG
โข
DAG๋ Directed Acyclic Graph์ ์ฝ์๋ก, ๋ฐฉํฅ์ฑ์ด ์๋ ๋น์ํ ๊ทธ๋ํ๋ฅผ ๋ํ๋ธ๋ค.
โข
Airflow์์ DAG๋ ์์
์ ํ๋ฆ์ด๋ ์์กด์ฑ์ ์ ์ํ๋๋ฐ ์ฌ์ฉ๋๋๋ฐ, ์ฌ๋ฌ ์์
๋ค ๊ฐ์ ์คํ ์์์ ์์กด์ฑ์ ํํํ๋ ๊ทธ๋ํ๋ผ๊ณ ๋ณด๋ฉด ๋๋ค.
โข
Airflow์์ DAG๋ Python ์คํฌ๋ฆฝํธ๋ก ์ ์๋๋ฉฐ, ์ด ์คํฌ๋ฆฝํธ๋ ์์
๋ค ๊ฐ์ ์์กด์ฑ ๋ฐ ์คํ ์ค์ผ์ค์ ๋ช
์ํ๋ค.
โข
Apache Airflow์ DAG (Directed Acyclic Graph) ์ฌ์ฉ ์์ ์ฅ์ :
1.
๊ฐ์์ฑ ๋ฐ ์ดํด๋: DAG๋ฅผ ์ฌ์ฉํ๋ฉด ์ํฌํ๋ก์ฐ์ ๊ตฌ์กฐ๋ฅผ ์๊ฐ์ ์ผ๋ก ํ์
ํ ์ ์๋ค. DAG ๊ทธ๋ํ๋ฅผ ํตํด ์์
๊ฐ์ ๊ด๊ณ๋ฅผ ์ฝ๊ฒ ์ดํดํ๊ณ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๋ค.
2.
์์กด์ฑ ๊ด๋ฆฌ: DAG๋ฅผ ์ฌ์ฉํ๋ฉด ์์
(task) ๊ฐ์ ์์กด์ฑ์ ๋ช
ํํ๊ฒ ์ ์ํ ์ ์๋ค. ์์
A๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์ด์ผ ์์
B๊ฐ ์คํ๋๋ ๋ฑ์ ์์กด์ฑ์ ์ ํํ๊ฒ ํํํ ์ ์๋ค.
3.
์ฌ์๋ ๋ฐ ๋ณต๊ตฌ: Airflow๋ ์์
์ด ์คํจํ ๊ฒฝ์ฐ ์๋์ผ๋ก ์ฌ์๋ํ ์ ์๋ค. DAG๋ฅผ ํตํด ์ฌ์๋ ๋ก์ง์ ์ค์ ํ๊ณ ์คํจํ ์์
์ ์๋์ผ๋ก ๋ณต๊ตฌํ ์ ์๋ค.
โข
์ฌ์๋: ์์
์ด ์คํจํ ๊ฒฝ์ฐ Airflow๊ฐ ํด๋น ์์
์ ์๋์ผ๋ก ๋ค์ ์๋ํ๋ ๊ณผ์
โฆ
์์
์ด ์์์น ๋ชปํ ์ค๋ฅ๋ก ์คํจํ์ ๋, ์์คํ
์ํ๊ฐ ๋ฐ๋์ด ๋ค์ ์๋ํด์ผ ํ ๋ ๋ฑ์ ์ ์ฉ
โข
๋ณต๊ตฌ: ์์
์ด ์คํจํ ๊ฒฝ์ฐ Airflow๊ฐ ํด๋น ์์
์ ๋ณต๊ตฌํ๋ ๊ณผ์
โฆ
์์
์ด ์คํจํ ์์ธ์ ํ์
ํ๊ณ ํด๋น ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ์ฌ ์์
์ ๋ค์ ์คํํ ๋ ์ฌ์ฉ
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from error_alarm import run_slack_alarm
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3, # ์ฌ์๋ ํ์
'retry_delay': timedelta(minutes=5), # ์ฌ์๋ ๊ฐ๊ฒฉ
}
# ์คํจ ์ ํธ์ถ๋ ํจ์
def on_failure_callback(context):
run_slack_alarm(context)
return
# DAG ์ ์
dag = DAG(
'retry_and_recovery_example',
default_args=default_args,
description='A simple DAG with retry and recovery',
schedule_interval=timedelta(days=1),
)
# ์ฌ์๋ ๋ฐ ๋ณต๊ตฌ๊ฐ ํ์ํ ์์
์ ์
def my_task_function():
# ์์ธ ๋ฐ์ ์ ์๋์ผ๋ก ์ฌ์๋๋ฉ๋๋ค.
raise Exception("An error occurred")
task_with_retry_and_recovery = PythonOperator(
task_id='task_with_retry_and_recovery',
python_callable=my_task_function,
retries=3, # ์์
๋ณ ์ฌ์๋ ํ์
retry_delay=timedelta(minutes=5), # ์์
๋ณ ์ฌ์๋ ๊ฐ๊ฒฉ
on_failure_callback=on_failure_callback, # ์คํจ ์ ํธ์ถ๋ ํจ์
dag=dag,
)
Python
๋ณต์ฌ
โข
'retries': 3# ์ฌ์๋ ํ์
โข
'retry_delay': timedelta(minutes=5) # ์ฌ์๋ ๊ฐ๊ฒฉ
โข
on_failure_callback=on_failure_callback # ์ฌ์๋ ์์
์ด ๋ชจ ์คํจํ์ ๋ ํธ์ถ๋ ํจ์ โ ์์
์คํจ์ ๋ํ ํ์ ์กฐ์น๋ฅผ ์ทจํ๊ณ ์์
์ ๋ณต๊ตฌํ๊ธฐ ์ํจ ex: run_slack_alarm(context)
4.
์ ์ฐ์ฑ: DAG๋ฅผ ์ฌ์ฉํ๋ฉด ์ํฌํ๋ก์ฐ๋ฅผ ์ฝ๊ฒ ์์ ํ๊ณ ํ์ฅํ ์ ์์ต๋๋ค. ์๋ก์ด ์์
์ ์ถ๊ฐํ๊ฑฐ๋ ๊ธฐ์กด ์์
์ ์์ ํ์ฌ ์ํฌํ๋ก์ฐ๋ฅผ ์ ์ฐํ๊ฒ ์กฐ์ ํ ์ ์๋ค.
โข
DAG ์ ์๋ ์ฃผ๋ก ๋ค์๊ณผ ๊ฐ์ ๊ตฌ์ฑ ์์๋ก ์ด๋ฃจ์ด์ง๋ค.
โฆ
DAG ๊ฐ์ฒดย : DAG ํด๋์ค์ ์ธ์คํด์ค๋ก, ์์
์ ํ๋ฆ๊ณผ ์ค์ผ์ค์ ์ ์
โฆ
์์
(Task)ย : DAG ๋ด์์ ์ํ๋์ด์ผ ํ๋ ๊ฐ๊ฐ์ ๋จ์ ์์
. ์์
์ PythonOperator, BashOperator, Python ํจ์ ๋ฑ์ผ๋ก ์ ์.
โฆ
์์กด์ฑ (Dependencies)ย : ์์
๊ฐ์ ์์กด์ฑ. ์ฆ, ์ด๋ค ์์
์ ๋ค๋ฅธ ์์
์ด ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋ ํ์๋ง ์คํ๋ ์ ์๋๋ก ์ค์ ํ ์ ์์.
โฆ
์ค์ผ์ค (Schedule)ย : DAG ๋ด์ ์์
์ด ์คํ๋๋ ์ฃผ๊ธฐ์ ์ธ ์ค์ผ์ค์ ์ ์. ์๋ฅผ ๋ค์ด, ๋งค์ผ, ๋งค์ฃผ ํน์ ์์ผ ๋ฑ์ผ๋ก ์ค์ผ์คํ ์ ์์.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# DAG ์ ์
dag = DAG(
'my_dag',
description='My example DAG',
schedule_interval=timedelta(days=1), # ๋งค์ผ ์คํ
start_date=datetime(2023, 1, 1),
catchup=False, # ๊ณผ๊ฑฐ ์คํ์์ ๋๋ฝ๋ ์์
์ ์ฌ์คํํ์ง ์์
)
# ์์
์ ์
def task1():
print("Task 1")
def task2():
print("Task 2")
# DAG์ ์์
์ถ๊ฐ
t1 = PythonOperator(
task_id='task1',
python_callable=task1,
dag=dag,
)
t2 = PythonOperator(
task_id='task2',
python_callable=task2,
dag=dag,
)
# ์์กด์ฑ ์ ์
t1 >> t2
Python
๋ณต์ฌ
Backfill
Incremental update
โข
์๋กญ๊ฒ ์ถ๊ฐ๋ ๋ถ๋ถ๋ง์ ๋ค์ ์ด์ด๋ถ์ด๋ ๋ฐฉ์
โข
์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ๋ค์ ์ฒ๋ฆฌํ๋ ๊ฒ๋ณด๋ค ํจ์จ์
โข
์๋ก์ด ๋ฐ์ดํฐ๊ฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ถ๊ฐ๋๋ ๊ฒฝ์ฐ, Airflow๋ฅผ ์ฌ์ฉํ์ฌ ์๋ก์ด ๋ฐ์ดํฐ๋ง ์ถ์ถํ์ฌ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ๋ก๋ํ ์ ์์ต๋๋ค.
โข
์ด๋ฅผ ํตํด ์๋ก์ด ๋ฐ์ดํฐ๋ง ์ฒ๋ฆฌํ๊ณ ์ด์ ๋ฐ์ดํฐ๋ ๋ณ๊ฒฝ๋์ง ์์์ผ๋ฏ๋ก ๋ค์ ์ฒ๋ฆฌํ ํ์๊ฐ ์์ต๋๋ค.
โข
Incremental update์ ๋ฐ๋๋๋ ๊ฐ๋
์ผ๋ก๋ Full Refresh๊ฐ ์์ต๋๋ค.
Airflow์ execution_date
โข
execution_date: task๊ฐ ์คํ๋๋ ๋ ์ง๊ฐ ์๋!
โข
Daily batch๋ ํ๋ฃจ๊ฐ ๋ง๊ฐ๋ ์๋ฒฝ์ ์ํ๋จ.
โข
ex: 2020-04-20 00:00:00~2020-04-21 00:00:00 ๊น์ง์ ํ๋ฃจ 24์๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ง๊ณ๋ 2020-04-21 00:00:00์ ์คํ๋จ.
โข
Airflow์ execution_date๋ ํ์ฌ ์ํํ๋ ์ผ์๋ก๋ถํฐ left-bound๋ ๊ฐ์ด ์ ๊ณต๋จ.
Airflow backfill
โข
Airflow์์๋ ๋ ์ง ๋ณ๋ก Backfill์ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋กํ๊ณ ์ฑ๊ณต ์ฌ๋ถ๋ฅผ ๊ธฐ๋กํฉ๋๋ค.
โข
ํด๋น ์ฑ๊ณต๊ณผ ์คํจ์ ๋ ์ง๋ค์ ์์คํ
์์ ETL์ ์ธ์๋ก ์ ๊ณต๋๋ค. ๋๋ฌธ์ ๋ฐ์ดํฐ ์์ง๋์ด๋ ๋ฐ์ดํฐ์ ๋ ์ง๋ฅผ ๊ณ์ฐํ์ง ์๊ณ ์์คํ
์ด ์ง์ ํด์ค ๋ ์ง('execution_date')๋ฅผ ์ฌ์ฉ๋ง ํ๋ฉด ๋ฉ๋๋ค.