๐Ÿฅž BE
home

DAG & Backfill

Date
2024/03/08
Category
Data Engineering
Tag
Apache Airflow
Detail

Apache Airflow DAG

โ€ข
DAG๋Š” Directed Acyclic Graph์˜ ์•ฝ์ž๋กœ, ๋ฐฉํ–ฅ์„ฑ์ด ์žˆ๋Š” ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„๋ฅผ ๋‚˜ํƒ€๋‚ธ๋‹ค.
โ€ข
Airflow์—์„œ DAG๋Š” ์ž‘์—…์˜ ํ๋ฆ„์ด๋‚˜ ์˜์กด์„ฑ์„ ์ •์˜ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋˜๋Š”๋ฐ, ์—ฌ๋Ÿฌ ์ž‘์—…๋“ค ๊ฐ„์˜ ์‹คํ–‰ ์ˆœ์„œ์™€ ์˜์กด์„ฑ์„ ํ‘œํ˜„ํ•˜๋Š” ๊ทธ๋ž˜ํ”„๋ผ๊ณ  ๋ณด๋ฉด ๋œ๋‹ค.
โ€ข
Airflow์—์„œ DAG๋Š” Python ์Šคํฌ๋ฆฝํŠธ๋กœ ์ •์˜๋˜๋ฉฐ, ์ด ์Šคํฌ๋ฆฝํŠธ๋Š” ์ž‘์—…๋“ค ๊ฐ„์˜ ์˜์กด์„ฑ ๋ฐ ์‹คํ–‰ ์Šค์ผ€์ค„์„ ๋ช…์‹œํ•œ๋‹ค.
โ€ข
Apache Airflow์˜ DAG (Directed Acyclic Graph) ์‚ฌ์šฉ ์‹œ์˜ ์žฅ์ :
1.
๊ฐ€์‹œ์„ฑ ๋ฐ ์ดํ•ด๋„: DAG๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์›Œํฌํ”Œ๋กœ์šฐ์˜ ๊ตฌ์กฐ๋ฅผ ์‹œ๊ฐ์ ์œผ๋กœ ํŒŒ์•…ํ•  ์ˆ˜ ์žˆ๋‹ค. DAG ๊ทธ๋ž˜ํ”„๋ฅผ ํ†ตํ•ด ์ž‘์—… ๊ฐ„์˜ ๊ด€๊ณ„๋ฅผ ์‰ฝ๊ฒŒ ์ดํ•ดํ•˜๊ณ  ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค.
2.
์˜์กด์„ฑ ๊ด€๋ฆฌ: DAG๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ž‘์—…(task) ๊ฐ„์˜ ์˜์กด์„ฑ์„ ๋ช…ํ™•ํ•˜๊ฒŒ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ž‘์—… A๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋˜์–ด์•ผ ์ž‘์—… B๊ฐ€ ์‹คํ–‰๋˜๋Š” ๋“ฑ์˜ ์˜์กด์„ฑ์„ ์ •ํ™•ํ•˜๊ฒŒ ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.
3.
์žฌ์‹œ๋„ ๋ฐ ๋ณต๊ตฌ: Airflow๋Š” ์ž‘์—…์ด ์‹คํŒจํ•  ๊ฒฝ์šฐ ์ž๋™์œผ๋กœ ์žฌ์‹œ๋„ํ•  ์ˆ˜ ์žˆ๋‹ค. DAG๋ฅผ ํ†ตํ•ด ์žฌ์‹œ๋„ ๋กœ์ง์„ ์„ค์ •ํ•˜๊ณ  ์‹คํŒจํ•œ ์ž‘์—…์„ ์ž๋™์œผ๋กœ ๋ณต๊ตฌํ•  ์ˆ˜ ์žˆ๋‹ค.
โ€ข
์žฌ์‹œ๋„: ์ž‘์—…์ด ์‹คํŒจํ•œ ๊ฒฝ์šฐ Airflow๊ฐ€ ํ•ด๋‹น ์ž‘์—…์„ ์ž๋™์œผ๋กœ ๋‹ค์‹œ ์‹œ๋„ํ•˜๋Š” ๊ณผ์ •
โ—ฆ
์ž‘์—…์ด ์˜ˆ์ƒ์น˜ ๋ชปํ•œ ์˜ค๋ฅ˜๋กœ ์‹คํŒจํ–ˆ์„ ๋•Œ, ์‹œ์Šคํ…œ ์ƒํƒœ๊ฐ€ ๋ฐ”๋€Œ์–ด ๋‹ค์‹œ ์‹œ๋„ํ•ด์•ผ ํ•  ๋•Œ ๋“ฑ์— ์œ ์šฉ
โ€ข
๋ณต๊ตฌ: ์ž‘์—…์ด ์‹คํŒจํ•œ ๊ฒฝ์šฐ Airflow๊ฐ€ ํ•ด๋‹น ์ž‘์—…์„ ๋ณต๊ตฌํ•˜๋Š” ๊ณผ์ •
โ—ฆ
์ž‘์—…์ด ์‹คํŒจํ•œ ์›์ธ์„ ํŒŒ์•…ํ•˜๊ณ  ํ•ด๋‹น ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜์—ฌ ์ž‘์—…์„ ๋‹ค์‹œ ์‹คํ–‰ํ•  ๋•Œ ์‚ฌ์šฉ
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta from error_alarm import run_slack_alarm default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 3, # ์žฌ์‹œ๋„ ํšŸ์ˆ˜ 'retry_delay': timedelta(minutes=5), # ์žฌ์‹œ๋„ ๊ฐ„๊ฒฉ } # ์‹คํŒจ ์‹œ ํ˜ธ์ถœ๋  ํ•จ์ˆ˜ def on_failure_callback(context): run_slack_alarm(context) return # DAG ์ •์˜ dag = DAG( 'retry_and_recovery_example', default_args=default_args, description='A simple DAG with retry and recovery', schedule_interval=timedelta(days=1), ) # ์žฌ์‹œ๋„ ๋ฐ ๋ณต๊ตฌ๊ฐ€ ํ•„์š”ํ•œ ์ž‘์—… ์ •์˜ def my_task_function(): # ์˜ˆ์™ธ ๋ฐœ์ƒ ์‹œ ์ž๋™์œผ๋กœ ์žฌ์‹œ๋„๋ฉ๋‹ˆ๋‹ค. raise Exception("An error occurred") task_with_retry_and_recovery = PythonOperator( task_id='task_with_retry_and_recovery', python_callable=my_task_function, retries=3, # ์ž‘์—… ๋ณ„ ์žฌ์‹œ๋„ ํšŸ์ˆ˜ retry_delay=timedelta(minutes=5), # ์ž‘์—… ๋ณ„ ์žฌ์‹œ๋„ ๊ฐ„๊ฒฉ on_failure_callback=on_failure_callback, # ์‹คํŒจ ์‹œ ํ˜ธ์ถœ๋  ํ•จ์ˆ˜ dag=dag, )
Python
๋ณต์‚ฌ
โ€ข
'retries': 3# ์žฌ์‹œ๋„ ํšŸ์ˆ˜
โ€ข
'retry_delay': timedelta(minutes=5) # ์žฌ์‹œ๋„ ๊ฐ„๊ฒฉ
โ€ข
on_failure_callback=on_failure_callback # ์žฌ์‹œ๋„ ์ž‘์—…์ด ๋ชจ ์‹คํŒจํ–ˆ์„ ๋•Œ ํ˜ธ์ถœ๋  ํ•จ์ˆ˜ โ†’ ์ž‘์—… ์‹คํŒจ์— ๋Œ€ํ•œ ํ•„์š” ์กฐ์น˜๋ฅผ ์ทจํ•˜๊ณ  ์ž‘์—…์„ ๋ณต๊ตฌํ•˜๊ธฐ ์œ„ํ•จ ex: run_slack_alarm(context)
4.
์œ ์—ฐ์„ฑ: DAG๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ์‰ฝ๊ฒŒ ์ˆ˜์ •ํ•˜๊ณ  ํ™•์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ƒˆ๋กœ์šด ์ž‘์—…์„ ์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ๊ธฐ์กด ์ž‘์—…์„ ์ˆ˜์ •ํ•˜์—ฌ ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ์œ ์—ฐํ•˜๊ฒŒ ์กฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.
โ€ข
DAG ์ •์˜๋Š” ์ฃผ๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ตฌ์„ฑ ์š”์†Œ๋กœ ์ด๋ฃจ์–ด์ง„๋‹ค.
โ—ฆ
DAG ๊ฐ์ฒดย : DAG ํด๋ž˜์Šค์˜ ์ธ์Šคํ„ด์Šค๋กœ, ์ž‘์—…์˜ ํ๋ฆ„๊ณผ ์Šค์ผ€์ค„์„ ์ •์˜
โ—ฆ
์ž‘์—… (Task)ย : DAG ๋‚ด์—์„œ ์ˆ˜ํ–‰๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ๊ฐ์˜ ๋‹จ์œ„ ์ž‘์—…. ์ž‘์—…์€ PythonOperator, BashOperator, Python ํ•จ์ˆ˜ ๋“ฑ์œผ๋กœ ์ •์˜.
โ—ฆ
์˜์กด์„ฑ (Dependencies)ย : ์ž‘์—… ๊ฐ„์˜ ์˜์กด์„ฑ. ์ฆ‰, ์–ด๋–ค ์ž‘์—…์€ ๋‹ค๋ฅธ ์ž‘์—…์ด ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋œ ํ›„์—๋งŒ ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋„๋ก ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Œ.
โ—ฆ
์Šค์ผ€์ค„ (Schedule)ย : DAG ๋‚ด์˜ ์ž‘์—…์ด ์‹คํ–‰๋˜๋Š” ์ฃผ๊ธฐ์ ์ธ ์Šค์ผ€์ค„์„ ์ •์˜. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋งค์ผ, ๋งค์ฃผ ํŠน์ • ์š”์ผ ๋“ฑ์œผ๋กœ ์Šค์ผ€์ค„ํ•  ์ˆ˜ ์žˆ์Œ.
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta # DAG ์ •์˜ dag = DAG( 'my_dag', description='My example DAG', schedule_interval=timedelta(days=1), # ๋งค์ผ ์‹คํ–‰ start_date=datetime(2023, 1, 1), catchup=False, # ๊ณผ๊ฑฐ ์‹คํ–‰์—์„œ ๋ˆ„๋ฝ๋œ ์ž‘์—…์„ ์žฌ์‹คํ–‰ํ•˜์ง€ ์•Š์Œ ) # ์ž‘์—… ์ •์˜ def task1(): print("Task 1") def task2(): print("Task 2") # DAG์— ์ž‘์—… ์ถ”๊ฐ€ t1 = PythonOperator( task_id='task1', python_callable=task1, dag=dag, ) t2 = PythonOperator( task_id='task2', python_callable=task2, dag=dag, ) # ์˜์กด์„ฑ ์ •์˜ t1 >> t2
Python
๋ณต์‚ฌ

Backfill

Incremental update

โ€ข
์ƒˆ๋กญ๊ฒŒ ์ถ”๊ฐ€๋œ ๋ถ€๋ถ„๋งŒ์„ ๋’ค์— ์ด์–ด๋ถ™์ด๋Š” ๋ฐฉ์‹
โ€ข
์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ๋ณด๋‹ค ํšจ์œจ์ 
โ€ข
์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ถ”๊ฐ€๋˜๋Š” ๊ฒฝ์šฐ, Airflow๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋งŒ ์ถ”์ถœํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค์— ๋กœ๋“œํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
โ€ข
์ด๋ฅผ ํ†ตํ•ด ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋งŒ ์ฒ˜๋ฆฌํ•˜๊ณ  ์ด์ „ ๋ฐ์ดํ„ฐ๋Š” ๋ณ€๊ฒฝ๋˜์ง€ ์•Š์•˜์œผ๋ฏ€๋กœ ๋‹ค์‹œ ์ฒ˜๋ฆฌํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.
โ€ข
Incremental update์˜ ๋ฐ˜๋Œ€๋˜๋Š” ๊ฐœ๋…์œผ๋กœ๋Š” Full Refresh๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

Airflow์˜ execution_date

โ€ข
execution_date: task๊ฐ€ ์‹คํ–‰๋˜๋Š” ๋‚ ์งœ๊ฐ€ ์•„๋‹˜!
โ€ข
Daily batch๋Š” ํ•˜๋ฃจ๊ฐ€ ๋งˆ๊ฐ๋œ ์ƒˆ๋ฒฝ์— ์ˆ˜ํ–‰๋จ.
โ€ข
ex: 2020-04-20 00:00:00~2020-04-21 00:00:00 ๊นŒ์ง€์˜ ํ•˜๋ฃจ 24์‹œ๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ง‘๊ณ„๋Š” 2020-04-21 00:00:00์— ์‹คํ–‰๋จ.
โ€ข
Airflow์˜ execution_date๋Š” ํ˜„์žฌ ์ˆ˜ํ–‰ํ•˜๋Š” ์ผ์ž๋กœ๋ถ€ํ„ฐ left-bound๋œ ๊ฐ’์ด ์ œ๊ณต๋จ.

Airflow backfill

โ€ข
Airflow์—์„œ๋Š” ๋‚ ์งœ ๋ณ„๋กœ Backfill์˜ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋กํ•˜๊ณ  ์„ฑ๊ณต ์—ฌ๋ถ€๋ฅผ ๊ธฐ๋กํ•ฉ๋‹ˆ๋‹ค.
โ€ข
ํ•ด๋‹น ์„ฑ๊ณต๊ณผ ์‹คํŒจ์˜ ๋‚ ์งœ๋“ค์€ ์‹œ์Šคํ…œ์—์„œ ETL์˜ ์ธ์ž๋กœ ์ œ๊ณต๋œ๋‹ค. ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋Š” ๋ฐ์ดํ„ฐ์˜ ๋‚ ์งœ๋ฅผ ๊ณ„์‚ฐํ•˜์ง€ ์•Š๊ณ  ์‹œ์Šคํ…œ์ด ์ง€์ •ํ•ด์ค€ ๋‚ ์งœ('execution_date')๋ฅผ ์‚ฌ์šฉ๋งŒ ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.