Airflow?
Apache Airflow๋ ๋ฐฐ์น ํ์ดํ๋ผ์ธ์ ๊ฐ๋ฐ, ์ค์ผ์ฅด๋ง, ๋ชจ๋ํฐ๋งํ๊ธฐ ์ํ ์คํ์์ค ํ๋ซํผ์
๋๋ค. Python ์ฝ๋๋ฅผ ์ด์ฉํด DAG(Directed Acyclic Graph, Task์ ์งํฉ)๋ฅผ ์ ์ํ๊ณ , ์ด๋ฅผ ํตํด ๋ฐฐ์น ์์
์ ๊ฐ๋ฐ, ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง ํ ์ ์์ต๋๋ค. ํ์ํ ๊ฒฝ์ฐ์๋ DAG ๊ฐ์ ์์กด์ฑ์ ์ ์ํ ์ ์์ด ์ ์ฉํฉ๋๋ค.
์์ดํ๋ก์ฐ๊ฐ ๋ฐ์ดํฐ ์์ง๋์ด๋ง์์ ๋ค๋ค์ง๋ ์ด์ ๋ ๋ฌด์์ผ๊น์? ์ค์ (๋ฐฐ์น์ฑ) ETL(ํน์ ELT) ์์
์ ๋งค์ฐ ๋ณต์กํ ์์์ ์์กด ๊ด๊ณ ์์์ ์งํ๋ฉ๋๋ค. ์์ดํ๋ก์ฐ๋ ํ์ด์ฌ ์ฝ๋๋ก ETL์ ์ธ๋ถ ๊ณผ์ ํ๋ํ๋๊ฐ ๊ฐ๊ธฐ ์ด๋ ์์กด์ฑ์ ๊ฐ์ง๊ณ , ์ธ์ ๋ถํฐ ์ธ์ ๊น์ง ์คํ๋์ด์ผ ํ ์ง๋ฅผ ์ปค์คํฐ๋ง์ด์ฆํ ์ ์์ต๋๋ค. (์ด๋ฏธ ๋ง๋ค์ด์ง ์ฌ๋ฌ ์คํผ๋ ์ดํฐ๋ฅผ ๊ฐ์ ธ๋ค๊ฐ ์ธ ์ ์๋ค๋ ์ ๋ ํฐ ์ฅ์ )
๋ํ ETL ํ๋ก์ธ์ค๋ฅผ ๋ก๊น
, ๋ชจ๋ํฐ๋งํด์ฃผ๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ ์์ง๋์ด๋ ์ํฌํ๋ก์ฐ๊ฐ ์ธ์ ์ด๋ป๊ฒ ์คํจํ์๋์ง ์ ํํ๊ฒ ์ธ์งํ ์ ์์ต๋๋ค. ๋์๊ฐ ์คํจํ ํํธ๋ฅผ ์ฌ์คํํ๋ backfill ๊ธฐ๋ฅ์ ์ง์ํ๊ณ ์์ต๋๋ค.
๋ณธ๊ฒฉ์ ์ธ ์ค๋ช ์ ์ฉ์ด ์ค๋ช
์ํฌํ๋ก์ฐ(Workflow)
์ํฌํ๋ก์ฐ๋ ์ฐ์์ ์ธ ์์
์ด๋ ๋จ๊ณ๋ฅผ ํตํด ํน์ ๋ชฉํ๋ฅผ ๋ฌ์ฑํ๊ธฐ ์ํ ์ผ๋ จ์ ๊ณผ์ ์ ๋ํ๋
๋๋ค. ๋ฐ์ดํฐ ์์ง๋์ด๋ง์์ ์ํฌํ๋ก์ฐ๋ ๋ฐ์ดํฐ ์์ง, ๋ณํ, ์ ์ฅ ๋ฑ์ ์์
๋ค์ ์กฐํฉํ์ฌ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๊ณผ์ ์ ์๋ฏธํฉ๋๋ค. ์ํฌํ๋ก์ฐ๋ฅผ ํจ์จ์ ์ผ๋ก ๊ด๋ฆฌํ๋ ค๋ฉด ์์
๋ค ๊ฐ์ ์์กด์ฑ, ์คํ ์์, ์ค์ผ์ค๋ง ๋ฑ์ ์ ํํ๊ฒ ์ค์ ํด์ผ ํฉ๋๋ค.
์ฌํ: ์ค์ผ์ฅด๋ฌ vs ์ค์ผ์คํธ๋ ์ดํฐ vs ์ํฌํ๋ก์ฐ ๊ด๋ฆฌ
๋๊ทธ(DAG - Directed Acyclic Graph)
DAG๋ "์ ํฅ ๋น์ํ ๊ทธ๋ํ(Directed Acyclic Graph)"์ ์ฝ์๋ก, ์์
๋ค ๊ฐ์ ์์กด ๊ด๊ณ๋ฅผ ๊ทธ๋ํ๋ก ํํํ ๊ฒ์
๋๋ค. ๋ฐ์ดํฐ ์์ง๋์ด๋ง์์๋ DAG๋ฅผ ์ฌ์ฉํ์ฌ ์ํฌํ๋ก์ฐ์ ์์
๋ค๊ณผ ๊ฐ ์์
๋ค ๊ฐ์ ์คํ ์์ ๋ฐ ์์กด์ฑ์ ๋ํ๋
๋๋ค.
DAG ์์
๊ฐ ์์
์ DAG ๋ด์์ ๋
ธ๋(node)๋ก ํํ๋๋ฉฐ, ์์
๋ค ์ฌ์ด์ ์์กด์ฑ์ ๊ฐ์ (edge)์ผ๋ก ํํ๋ฉ๋๋ค. DAG์ ์ค์ํ ํน์ฑ์ ๋น์ํ์ฑ(์ฌ์ดํด์ด ์์)์ธ๋ฐ, ์ด๋ ๋ชจ๋ ์์
๋ค์ด ์ํ ์์ด ์คํ๋๋ ๊ฒ์ ๋ณด์ฅํฉ๋๋ค.
์คํผ๋ ์ดํฐ(Operator)
Operator๋ ํ์คํฌ(Task)๋ฅผ ์ด๋ป๊ฒ ์คํ์ํฌ์ง์ ๋ํด ์์ฑํด๋ ๊ฒ์
๋๋ค. ์ฌ๋ฌ ์คํผ๋ ์ดํฐ๋ฅผ ์ฎ์ด ํ๋์ DAG๋ฅผ ๊ตฌ์ฑํ ์ ์์ต๋๋ค. ์์ดํ๋ก์ฐ์๋ ์ฌ๋ฌ ์คํผ๋ ์ดํฐ๊ฐ ์์ผ๋ฉฐ, ๋ฏธ๋ฆฌ ๋ง๋ค์ด์ง ์คํผ๋ ์ดํฐ๋ฅผ ๊ฐ์ ธ์ ์ฌ์ฉํ ์๋ ์์ต๋๋ค.
โข
PythonOperator : Python ํจ์๋ฅผ ์ฒ๋ฆฌํ๋ ์คํผ๋ ์ดํฐ
โข
BashOperator : Bash ์ปค๋งจ๋๋ฅผ ์คํํ๋ ์คํผ๋ ์ดํฐ
โข
EmailOperator : ์ด๋ฉ์ผ์ ์ ์กํ๋ ์คํผ๋ ์ดํฐ
โข
SimpleHttpOperator : HTTP Request๋ฅผ ์ํํ๋ ์คํผ๋ ์ดํฐ
โข
Provider Packages ์์ ์ธ๋ถ ํด๋ผ์ฐ๋(AWS, GCP ๋ฑ) ์ ํ๊ณผ ์์ดํ๋ก์ฐ๋ฅผ ์ฐ๋ํ์ฌ ์ฌ์ฉํ ์ ์๋ ์คํผ๋ ์ดํฐ๋ค๋ ์กด์ฌ
ํ์คํฌ(Task)
Task๋ ์์
๋จ์๋ก ์ํฌํ๋ก์ฐ ๋ด์์ ์คํ๋๋ ๋จ์ ์์
์ ๋ํ๋
๋๋ค. Task๋ ์ค์ ๋ก ๋ฐ์ดํฐ ์ถ์ถ, ๋ณํ, ์ ์ฅ ๋ฑ์ ์์
์ ์ํํ๋ฉฐ, ๊ฐ๊ฐ์ Task๋ ์ํฌํ๋ก์ฐ์ ๋จ๊ณ๋ฅผ ๋ํ๋ด๊ฑฐ๋ ํน์ ์์
์ ์ํํ๋ ์ญํ ์ ํฉ๋๋ค. Airflow์์ Task๋ ์์
์ ์ต์ ์คํ ๋จ์์ด๋ฉฐ, DAG ๋ด์์ ์ ์๋ ์์
๋ค ๊ฐ์ ์์กด์ฑ ๋ฐ ์คํ ์์๋ฅผ ๊ด๋ฆฌํ๋ ์ค์ํ ์์์
๋๋ค.
์ฝ๊ฒ ๋งํด DAG๋ ์ฌ๋ฌ Operator๋ก ์ด๋ฃจ์ด์ ธ์์ผ๋ฉฐ, ์ด Operator๊ฐ ์ค์ ์ธ์คํด์คํ๋์ด ์คํ๋๋ ๊ฒ์ด Task์
๋๋ค.
์ฉ์ด๊ฐ ์ข ํท๊ฐ๋ ค๋ ๊ด์ฐฎ์ต๋๋ค. ์ฝ๋๋ณด๋ฉด ์ ๊ฐ๋
๋ค ํ ๋ฒ์ ์ดํดํ ์ ์์ต๋๋ค
์ค์ต 1: ์ฐ์ ๋์๋ด ์๋ค
๋ก์ปฌ์ด๋ Docker-Compose ๋ ์ค ํ๋๋ง ํด๋ ๋ฉ๋๋ค. ํ์ง๋ง ์ํํ ์ค์ต์ ์ํด์ docker compose ๋ฒ์ ์ผ๋ก ํด์ฃผ์ธ์. (๋ก์ปฌ ๋ฒ์ ์์๋ ์ค์ต 2๊ฐ ์ ๋์๊ฐ๋๋ค..!)
Docker-Compose๋ CeleryExecutor + Redis + PostgreSQL ๋ฒ์ , ๋ก์ปฌ์ LockerExecutor + sqlite ๋ฒ์ ์
๋๋ค
Docker-Compose ๋ฒ์
๋ก์ปฌ ๋ฒ์
Airflow ๊ตฌ์ฑ
DAG Directory
โข
airflow.cfg ํ์ผ์์ dags_folder๋ก ์ค์ ๊ฐ๋ฅ. ๊ธฐ๋ณธ์ $AIRFLOW_HOME/dags
โข
Airflow์์ DAG Directory์ ์๋ก ์๊ธด DAG ํ์ผ์ ์ด๋ป๊ฒ ์ธ์งํ ์ ์์๊น?
โฆ
Scheduler์์ dags_folder๋ก ์ง์ ๋ ํด๋, ๊ทธ๋ฆฌ๊ณ ๊ทธ ํ์ ํด๋์ ์๋ ๋ชจ๋ ํ์ด์ฌ ํ์ผ์ ์คํํด๋ณด๊ณ DAG ๋ชจ๋์ด import ๋์ด ์๋์ง ํ์ธํจ
โฆ
airflow.cfg์์ dags_dir_list_interval์์ ์ค์บ ์ฃผ๊ธฐ ์ค์ ๊ฐ๋ฅ (๊ธฐ๋ณธ์ 300์ด)
โฆ
์ ๋๋ก ๋ DAG์ ๋ํด์๋ ์ด๋ค DAG์ Task๊ฐ ์๋์ง ์๊ณ ๋๋์ง๋ง, ์ผ๋ฐ python ์คํฌ๋ฆฝํธ์ ๊ฒฝ์ฐ ๊ทธ๋๋ก ์คํํด๋ฒ๋ฆฌ๋ ๊ฒ. ์ ํํ๋ dag๋ฅผ ์คํํ๋ ๊ฒ์ด ์๋๋ผ python ์คํฌ๋ฆฝํธ์ main ํจ์๋ฅผ ์คํํ๋ค๋ ์๋ฏธ. ์ฌ๋ฌ python ์คํฌ๋ฆฝํธ ์ค dag ํ์ผ์ธ ๊ฒ์ airflow์ ์ถ๊ฐํด๋๋ค.
์ด๋ฐ ๋์ ๋งค์ปค๋์ฆ ๋๋ฌธ์ ์ฌ๋ฌ ๋ฌธ์ ๊ฐ ๋ฐ์.
ํ
์คํธ ์ฝ๋ ๋ฑ์ ๋ฃ์ด๋์๋ ์์ดํ๋ก์ฐ๊ฐ ์คํ์ํจ๋คโฆ ํน์ ํ
์ด๋ธ ๋ ๋ฆฌ๋ ํ
์คํธ ์ฝ๋๊ฐ ์๋ค๋ฉด ๊ทธ๊ฒ ํน์ ์ฃผ๊ธฐ๋ง๋ค ๊ณ์ ์คํ๋๋ ๊ฒ.
ํด๊ฒฐ์ฑ
์ dags ํด๋ ์์ .airflowignore ํ์ผ์ ๋ง๋ค์ด์ ๊ฑฐ๊ธฐ์ ํ
์คํธ ์ฝ๋ ๋ฃ์ด๋๊ธฐ
Scheduler
โข
๊ฐ์ข
๋ฉํ ์ ๋ณด์ ๊ธฐ๋ก์ ๋ด๋น
โข
DAG Directory ๋ด .py ํ์ผ์์ DAG๋ฅผ ํ์ฑํ์ฌ DB์ ์ ์ฅ
โข
DAG๋ค์ ์ค์ผ์ฅด๋ง ๊ด๋ฆฌ ๋ฐ ๋ด๋น
โข
์คํ ์งํ ์ํฉ๊ณผ ๊ฒฐ๊ณผ๋ฅผ DB์ ์ ์ฅ
โข
Executor๋ฅผ ํตํด ์ค์ ๋ก ์ค์ผ์ฅด๋ง๋ DAG๋ฅผ ์คํ
Scheduler - Executor
โข
Local Executor
โฆ
DAG Run์ ํ๋ก์ธ์ค ๋จ์๋ก ์คํ
โช
Local Executor
โข
ํ๋์ DAG Run์ ํ๋์ ํ๋ก์ธ์ค๋ก ๋์์ ์คํ
โข
์ต๋๋ก ์์ฑํ ํ๋ก๋ ์ค ์๋ฅผ ์ ํ์ฌ ์ฌ์ฉ
โช
Sequential Executor
โข
ํ๋์ ํ๋ก์ธ์ค์์ ๋ชจ๋ DAG Run๋ค์ ์ฒ๋ฆฌ
โข
Airflow์ ๊ธฐ๋ณธ Executor
โข
Remote Executor
โฆ
DAG Run์ ์ธ๋ถ ํ๋ก์ธ์ค๋ก ์คํ
โช
Celery Executor
โข
DAG Run์ Celery Worker Process๋ก ์คํ
โช
Kubernetes Executor
โข
DAG Run ํ๋๊ฐ Pod (== k8s ์์์ ์์ดํ๋ก์ฐ ์ฌ์ฉ ์ ์ด์ฉํ๋ executor)
Metadata DB
โข
๋ฉํ ์ ๋ณด๋ฅผ ์ ์ฅํ๋ ๊ณณ
โข
Scheduler์ ์ํด ๋ฉํ ๋ฐ์ดํฐ๊ฐ ์์
โข
Sqlite๊ฐ ๊ธฐ๋ณธ์ด๋ ๋ณดํต MySQL์ด๋ Postgres๋ฅผ ์ฌ์ฉ
โข
ํ์ฑํ DAG ์ ๋ณด, DAG Run ์ํ์ ์คํ ๋ด์ฉ, Task ์ ๋ณด ๋ฑ์ ์ ์ฅ
โข
User์ Role์ ๋ํ ์ ๋ณด ์ ์ฅ
Metadata DB์ ๋ด์ฉ์ fernet_key๋ฅผ ์ฌ์ฉํ์ฌ ์ํธํ ๊ฐ๋ฅ
โข
fernet_key๋ฅผ ์ธํ
์ ํ๋ฉด ์ํธํ๋์ง ์๊ณ ํ๋ ์ธ ํ
์คํธ๊ฐ ์ ์ฅ
โข
๊ทธ๋ฐ๋ฐ fernet_key๋ฅผ ์์ด๋ฒ๋ฆฌ๋ฉด db ๋ณต๊ตฌ๋ฅผ ๋ชปํจ;;;
Worker
โข
DAG๋ฅผ ์ค์ ๋ก ์คํ์ํด
โฆ
Scheduler์ ์ํด ์๊ธฐ๊ณ ์คํ๋จ
โฆ
Executor์ ๋ฐ๋ผ Worker์ ํํ๊ฐ ๋ค๋ฆ
โช
Celery ํน์ Local Executor์ ๊ฒฝ์ฐ Worker๋ ํ๋ก์ธ์ค
โช
K8S Executor์ ๊ฒฝ์ฐ Worker๋ ํ๋
โฆ
DAG Run์ ์คํํ๋ ๊ณผ์ ์์ ์๊ธด ๋ก๊ทธ๋ฅผ ์ ์ฅ
Webserver
โข
Web UI๋ฅผ ๋ด๋น (flask๋ก ๊ตฌํ)
โข
REST API๋ฅผ ์ ๊ณตํ๊ธฐ ๋๋ฌธ์ ๊ผญ Web UI๋ฅผ ํตํ์ง ์์๋ ํต์ ๊ฐ๋ฅ
โข
๋ณดํต ์ด ์น์๋ฒ๋ฅผ ์ด์ฉํ์ฌ DAG๋ฅผ ON/OFF, ๋ชจ๋ํฐ๋ง ํจ
โข
Metadata DB์ ํต์ ํ๋ฉฐ ์ ์ ์๊ฒ ํ์ํ ๋ฉํ ๋ฐ์ดํฐ๋ฅผ ์น ๋ธ๋ผ์ฐ์ ๋ก ๋ณด์ฌ์ฃผ๊ณ ์๊ฐํ ํจ
์ค์ต 2: DAG ๊ฐ๋ฐ ์ฐ๋จนํด๋ณด๊ธฐ
์ค์ต ์ - ๊ฐ๋ ์กฐ๊ธ ๋
DAG Parameter (not task parameter)
dag = DAG(
dag_id = "helloWorld",
start_date = datetime(2022,6,15), #์๋ ์์ํ์ด์ผ ํ ๋ ์ง ํ์ง๋ง ๋ช
์์ ์ผ๋ก ์์ ์ ํด์ฃผ๋ฉด dag๋ ์์ ์ ํจ
catchup=False, #start_date์ ์ค์ ์์์ผ ์ฌ์ด์ ์คํ ์ ๋ ๊ฒ์ ํ ๋ฒ์ ์คํํด์ฃผ๋ ์ต์
(๋ชจ๋ฅด๊ฒ ์ผ๋ฉด falseโฆ)
tags=['example'], #ํ๋ณ๋ก, ํ์คํฌ๋ณ๋ก tag ๋ถ์ผ ์ ์์ (DAG ์๋ฐฑ๊ฐ ๋๋ฉด ํ๋ํ๋ ๊ธฐ์ตํ ์ ์์. tag ํ์)
schedule = '0 2 * * *', #cron์ผ๋ก ํํ
default_args={ #dag ๋ฐ์ ์ํ task์ ์ ์ฉ๋๋ ํ๋ผ๋ฏธํฐ
'on_failure_callback': slack.on_failure_callback,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
)
Python
๋ณต์ฌ
โข
max_active_runs: DAG ์ธ์คํด์ค๊ฐ ํ ๋ฒ์ ๋ช ๊ฐ๊น์ง ๋์๊ฐ ์ ์๋๊ฐ
โข
max_active_tasks: ํ์คํฌ๋ค์ด ํ ๋ฒ์ ๋ช ๊ฐ๊น์ง ๋์๊ฐ ์ ์๋๊ฐ
โ ๋์ worker์ CPU ์ฝ์ด ์์ ์ํด upper limit์ด ๊ฑธ๋ฆผ (๋ง์ด ์ค์ ํด๋๋ ๋์์ ๋์๊ฐ ์ ์๋ ํ์๋ ์ ํ)
โข
catchup: whether to backfill pasts runs
โข
DAG ํ๋ผ๋ฏธํฐ์ Task ํ๋ผ๋ฏธํฐ ์ฐจ์ด์ ์ ์ดํดํด์ผ ํจ!
โฆ
DAG ํ๋ผ๋ฏธํฐ๋ DAG ๊ฐ์ฒด๋ฅผ ๋ง๋ค ๋ ์ง์ ํด์ค์ผ ํจ
Task Parameters - default_arg
โข
๋ค์ DAG ๊ฐ์ฒด๋ฅผ ๋ง๋ค ๋ ์ง์ ํจ
โข
ํ๋ผ๋ฏธํฐ ์์:
โฆ
owner: owner๊ฐ ๋๊ตฌ์ธ์ง
โฆ
email: ์คํจํ์ ๋ ๋๊ตฌ์๊ฒ ์ด๋ฉ์ผ?
โฆ
retries: ์คํจ์ ์ฌ์คํ ํ์
โฆ
retry_delay: ์ฌ์คํ ๊ฐ๊ฒฉ
โฆ
on_failure_callback: ์คํจํ์ ๋ ์คํํ ํจ์ (๋ณดํต ๋๊ตฌ์๊ฒ ์ด๋ฉ์ผ ๋ณด๋ด๋ผ ์ด๋ฐ ํจ์ ์ฌ์ฉํจ)
โฆ
on_success_callbak: ์ฑ๊ณตํ์ ๋ ์คํํ ํจ์
์ค์ต ์ - ์ฝ๋ ๊ตฌ๊ฒฝํด๋ณด๊ธฐ
์ฐ์ ์์๋ฅผ ๋ณด๋ฉด์ ๊ฐ์ ์ก์๋ด
์๋ค
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,6,14),
catchup=False,
tags=['example'],
schedule = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
Python
๋ณต์ฌ
์กฐ๊ธ ๋ ๋๋ฒจ๋กญ์ํค๋ฉด ์๋์ ๊ฐ์ ํํ๋ก ๊ฐ๋จํ ํํ ๊ฐ๋ฅํฉ๋๋ค
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
Python
๋ณต์ฌ
๋ณธ๊ฒฉ์ ์ผ๋ก ์ค์ตํด๋ณด๊ธฐ
๋ณธ๊ฒฉ์ ์ผ๋ก DAG ๊ฐ๋ฐ์ ํด๋ด
์๋ค.
์๋ ์ฝ๋๋ indeed์์ airflow ๊ธฐ์ ์คํ์ ์ฌ์ฉํ๋ ์ฑ์ฉ ๊ณต๊ณ ๋ฅผ ๊ฒ์, title์ ํฌ๋กค๋งํ์ฌ ์ ์ฅํ๋ ์์ฃผ์์ฃผ ๊ฐ๋จํ ETL์ airflow๋ก ๋๋ ค๋ณด๊ฒ ์ต๋๋ค.
๋ณธ๊ฒฉ์ ์ผ๋ก ์ค์ตํด๋ณด๊ธฐ - ๋ฏผ๊ฐํ ์ ๋ณด ๋ง์คํน
Airflow์์๋ Connection๊ณผ Variables์ ํตํด ๋ฏผ๊ฐํ DB ํน์ ํด๋ผ์ฐ๋ Connection ์ ๋ณด, ํ๊ฒฝ ๋ณ์๋ฅผ ๋ง์คํนํ ์ ์์ต๋๋ค. ์ด๋ฒ ์ค์ต์์ Connection์ ํ์ฉํด Postgresql Connection ์ ๋ณด๋ฅผ, Variables๋ฅผ ํ์ฉํด ๋ฐ์ดํฐ ์์ค ๋งํฌ๋ฅผ ๋ง์คํนํด๋ณด๊ฒ ์ต๋๋ค.
1.
Airflow Webserver์์ Admin์ Connections๋ฅผ ๋๋ฅธ๋ค
โข
+์ ๋๋ฌ ์๋ก์ด ์ปค๋ฅ์
์ ๋ฑ๋กํด์ค๋ค
3.
Connection Type์ โPostgresโ๋ก ๋ณ๊ฒฝํ๊ณ ์ ๋ณด๋ฅผ ์
๋ ฅํ๋ค
โข
Connection Id: postgres_conn_id
โข
Connection Type: Postgres
โข
Host: ์์ ์ Postgres ์ปจํ
์ด๋ ์ด๋ฆ
โข
Login: airflow
โข
password: airflow
โข
port: 5432
4.
Test ํ์ ์ฐ๊ฒฐ์ด ๋๋ค๋ฉด Save๋ฅผ ๋๋ฌ Connection์ ๋ฑ๋ก ์๋ฃํ๋ค
5.
Admin์์ Variables๋ฅผ ํด๋ฆญํ๋ค
6.
Connection๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก +๋ฅผ ๋๋ฌ ํ๊ฒฝ ๋ณ์๋ฅผ ์๋ก ๋ฑ๋กํ๋ค
7.
Variables๋ ํค-๋ฐธ๋ฅ ํ์์ผ๋ก ๋ฑ๋ก ๊ฐ๋ฅํ๋ค
โข
Key: job_search_url
8.
Save ๋ฒํผ์ ๋๋ฌ ๋ฑ๋ก
๋ณธ๊ฒฉ์ ์ผ๋ก ์ค์ตํด๋ณด๊ธฐ - DAG ์์ฑํ๊ธฐ
mkdir dags && cd dags
Python
๋ณต์ฌ
dags ํด๋์ boaz_simple_etl.py ํ์ผ์ ํ๋ ์์ฑํฉ๋๋ค (vi ํน์ IDE ํ์ฉ)
# boaz_simple_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
import requests
from bs4 import BeautifulSoup
import csv
# ์ฌ์ฉํ Operator Import
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def extract(link):
f = requests.get(link)
return f.text
def transform(text):
soup = BeautifulSoup(text, 'html.parser')
titles = []
tmp = soup.select(".title")
for title in tmp:
titles.append(title.get_text())
return titles
def load(titles):
with open("job_search_result.csv", "w") as file:
writer = csv.writer(file)
writer.writerow(titles)
def etl():
link = "https://kr.indeed.com/jobs?q=airflow"
text = extract(link)
titles = transform(text)
load(titles)
# DAG ์ค์
dag = DAG(
dag_id="boaz_simple_etl",
schedule_interval="@daily",
start_date=datetime(2022,6,15), #์๋ ์์ํ์ด์ผ ํ ๋ ์ง ํ์ง๋ง ๋ช
์์ ์ผ๋ก ์์ ์ ํด์ฃผ๋ฉด dag๋ ์์ ์ ํจ
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback, # ์คํจ์ SLACK ํจ์ ์์ฒญ
}, #dag ๋ฐ์ ์ํ task์ ์ ์ฉ๋๋ ํ๋ผ๋ฏธํฐ
tags=["BOAZ"], #ํ๋ณ๋ก, ํ์คํฌ๋ณ๋ก tag ๋ถ์ผ ์ ์์ (DAG ์๋ฐฑ๊ฐ ๋๋ฉด ํ๋ํ๋ ๊ธฐ์ตํ ์ ์์. tag ํ์)
catchup=False #start_date์ ์ค์ ์์์ผ ์ฌ์ด์ ์คํ ์ ๋ ๊ฒ์ ํ ๋ฒ์ ์คํํด์ฃผ๋ ์ต์
(๋ชจ๋ฅด๊ฒ ์ผ๋ฉด falseโฆ)
)
# Operator ๋ง๋ค๊ธฐ
creating_table = PostgresOperator(
task_id="creating_table",
postgres_conn_id="postgres_conn_id", # ์นUI์์ connection์ ๋ฑ๋กํด์ค์ผ ํจ.
sql='''
CREATE TABLE IF NOT EXISTS job_search_result (
title TEXT
)
''',
dag = dag
)
etl = PythonOperator(
task_id = "etl",
python_callable = etl, # ์คํํ ํ์ด์ฌ ํจ์
dag = dag
)
# ์ ์ฅ๋์๋ค๋ ์ฌ์ค echo
store_result = BashOperator(
task_id="store_result",
bash_command= "echo 'ETL Done'",
dag=dag
)
# ํ์ดํ๋ผ์ธ ๊ตฌ์ฑํ๊ธฐ
creating_table >> etl >> store_result
Python
๋ณต์ฌ
Webserver์์ DAG ์คํํด๋ณด๊ธฐ
dags ํด๋์์ dag๋ฅผ ์์ฑํ๊ฒ ๋๋ฉด scheduler๋ ์ด ํ์ผ์ ํ์ฑํ์ฌ web ui์์๋ ๋ณผ ์ ์๋๋ก ๋ํ๋
๋๋ค. dag๋ ์์ฑํ๋ค๊ณ ํด์ ๋ฐ๋ก ์คํ๋๋ ๊ฒ์ ์๋๋๋ค. ์ผ์ชฝ ํ ๊ธ ๋ฒํผ์ ํด๋ฆญํ์ฌ DAG๋ฅผ ์คํ์ํฌ ์ ์์ต๋๋ค.
์คํ ํ boaz_simple_etl์ ํด๋ฆญํ์ฌ ๋ค์ด๊ฐ๋ฉด ์๋์ ๊ฐ์ ํ๋ฉด์ ๋ณผ ์ ์์ต๋๋ค. ๊ฐ ํ์คํฌ์ ์งํ ์ํฉ๊ณผ ๋ฉํ ๋ฐ์ดํฐ๋ค์ ๋ณผ ์ ์์ต๋๋ค. ์ด๋ ๊ฐ ํ์คํฌ๊ฐ ์ด๋ก์์ด๋ผ๋ฉด ์ฑ๊ณต, ๋
ธ๋์์ด๋ผ๋ฉด ์คํ ์ค, ๋นจ๊ฐ์์ด๋ผ๋ฉด ์คํจํ๋ค๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค. (Graph๋ฅผ ํด๋ฆญํ์ฌ ์ํฌํ๋ก์ฐ ๊ณผ์ ์ ์กฐ๊ธ ๋ ์ง๊ด์ ์ผ๋ก ๋ณผ ์๋ ์์ต๋๋ค)
๋ง์ฝ ๊ณ์ํด์ ์คํ์ด ์ง์๋๊ฑฐ๋, ํ์คํฌ ์คํ์ ์คํจํ๋ค๋ฉด ๊ฐ ํ์คํฌ๋ฅผ ํด๋ฆญํ์ฌ log๋ฅผ ํ์ธํ ์ ์์ต๋๋ค. DAG ๋๋ฒ๊น
์ ์ด ์คํ ๋ก๊ทธ๋ค์ ๋ณด๋ฉฐ ํ ์ ์์ต๋๋ค.
์ฌํ - Xcom์ ํ์ฉํ ํ์คํฌ๋ผ๋ฆฌ์ ๋ฐ์ดํฐ ๊ตํ
์ฌํ - Decorator ์ฌ์ฉํ๊ธฐ
Airflow ์ ์๊ฐ
์์ดํ๋ก์ฐ๋ ์ฃผ์ ๊ธฐ๋ฅ ์ค ํ๋๋ ์ค์ผ์ฅด๋ง์
๋๋ค. ์ฆ, ์์ดํ๋ก์ฐ๊ฐ ์๊ฐ์ ์ด๋ป๊ฒ ๋ค๋ฃจ๋๊ฐ๋ ๋งค์ฐ ์ค์ํฉ๋๋ค (๊ทธ๋ฆฌ๊ณ ๊ณจ๋๋ฆฝ๋๋ค)
Airflow์ ํ์์กด
airflow.cfg์์ ํ์์กด ์ค์ ์ ํ ์ ์์ต๋๋ค. ๊ธฐ๋ณธ ํ์์กด์ UTC๋ฅผ ๋ฐ๋ฆ
๋๋ค.
โข
default_timezone: start_date, end_date, schedule์ ํ์์กด์ ์ํฅ
โข
default_ui_timezone: web server ์ฐ์ธก ์๋จ์ ํ์์กด ์ธํ
์ ๋ฐ์๋จ
ํ์๋ ์ด์ผ๊ธฐํ๊ฒ ์ง๋ง ํ์์กด๊ณผ ์ค์ผ์ฅด๋ง์ ๊ฝค ๊น๋ค๋ก์ด ์์
์ด๊ธฐ ๋๋ฌธ์ KST๊ฐ ์๋ UTC๋ฅผ ๊ธฐ์ค์ผ๋ก ์์
ํ๋ ๊ฒ์ด ์ด๋ณด์ ์
์ฅ์์ ์ ์ ๊ฑด๊ฐ์ ์ด๋กญ์ต๋๋ค
Airflow ์ค์ผ์ฅด๋ง - Legacy: start_date์ execution_date
์์ดํ๋ก์ฐ๋ ์ด๋ค ์๊ฐ์ ๊ธฐ์ค์ผ๋ก ์ธ์ ์ด๋ป๊ฒ ์ค์ผ์ฅด๋ง์ด ๋ ๊น์? ๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ๋์์ ๊ฐ์ฅ ๋ฌธ์ ์ ์ธ ๊ฐ๋
์ธ start_date์ execution_date์ ๋ํด์ ๊ฐ๋จํ ์์๋ด
์๋ค
1.
์ํํ ์ดํด๋ฅผ ์ํด DAG์ ์คํ ์ฃผ๊ธฐ start_date execution_date๋ ์๋ก ๋ค๋ฅธ ๊ฐ๋
์์ ์ฐ์ ์์งํฉ๋๋ค.
2.
์์ดํ๋ก์ฐ์ ์ค์ผ์ฅด๋ง ์ปจ์
์ ์ผ ๋ฐฐ์น๋ฉด ํ๋ฃจ ์ ๊ธฐ์ค์ผ๋ก ๋๊ณ , ์๊ฐ ๋ฐฐ์น๋ฉด ์๊ฐ ์ ๊ธฐ์ค์ผ๋ก ๋๋ ๊ฒ. ์ฆ, ๋ฐฐ์น ๊ฐ๊ฒฉ๋งํผ์ ๊ณผ๊ฑฐ๋ฅผ ์ฝ์ด์ค๋ ๊ฒ์
๋๋ค.
3.
์์ดํ๋ก์ฐ๊ฐ ์คํ๋๋ ๊ฒ๊ณผ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์์ผ ํ๋ ์์ ์ ์ ๊ตฌ๋ถํด์ ์๊ฐํฉ๋๋ค
์ฝ๊ฒ DAG์ ์คํ ์ฃผ๊ธฐ๋ ํ๋ฃจ์์ ๊ฐ์ ํ๊ณ ์ค๋ช
ํฉ๋๋ค. ๋งค์ผ 00:01์ ์ ๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์์ ๊ณ์ฐํ ๋ค์ ๋ก๋ํ๋ DAG๋ผ๊ณ ๊ฐ์ ํ๊ฒ ์ต๋๋ค.
schedule_interval (DAG์ ์คํ ์ฃผ๊ธฐ)
โข
default๋ None
โข
schedule_interval="@daily" ์ ๊ฐ์ด @daily ๋งคํฌ๋ก๋ ๋งค์ผ ์์ ์ DAG๋ฅผ ์คํํ๋๋ก ์์ฝํฉ๋๋ค. ์ด ์ธ์๋ ๋ค์๊ณผ ๊ฐ์ ๋งคํฌ๋ก๋ฅผ ์ง์
โข
cron์ ํ์ฉํ์ฌ ์ค์ผ์ฅด๋ง ๊ฐ๋ฅ
โข
timedelta ์ธ์คํด์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋น๋ ๊ธฐ๋ฐ ์ค์ผ์ฅด์ ์ ์ ๊ฐ๋ฅ
ํ๋ฆฌ์
| ์ค๋ช
|
@once | 1ํ๋ง ์คํํ๋๋ก ์ค์ผ์ค |
@hourly | ๋งค์๊ฐ ๋ณ๊ฒฝ ์ 1ํ ์คํ |
@daily | ๋งค์ผ ์์ ์ 1ํ ์คํ |
@weekly | ๋งค์ฃผ ์ผ์์ผ ์์ ์ 1ํ ์คํ |
@monthly | ๋งค์ 1์ผ ์์ ์ 1ํ ์คํ |
@yearly | ๋งค๋
1์ 1์ผ ์์ ์ 1ํ ์คํ |
start_date
DAG๊ฐ ์ฒ์ ์คํ๋์ด์ผ ํ๋ ๋ ์ง๊ฐ ์๋ ์์ดํ๋ก์ฐ๊ฐ ์ด๋ค ๋ ์ง๋ฅผ ๊ธฐ์ค์ผ๋ก ์ฃผ๊ธฐ๋ฅผ ๊ณ์ฐํด์ผ ํ๋์ง์ ๋ํ ์ ๋ณด์
๋๋ค. ์ด๊ฒ ๋ฌด์จ ๋ง์ด๋
start_date๊ฐ 2023๋
1์ 1์ผ๋ก ์ค์ ๋์ด ์์ ๋ ๋ฐ์ผ๋ฆฌ ๋๊ทธ์ ์คํ๊ณผ ์ฒ๋ฆฌ๋ ์๋์ ๊ฐ์ต๋๋ค.
โข
์ฒ์ ์คํ๋๋ ๊ฒ์ 2023๋
1์ 2์ผ 00์ 01๋ถ
โข
์ฒ์ ์ฝ์ด์ค๋ ๋ฐ์ดํฐ๋ 2023๋
1์ 1์ผ์ ๋ฐ์ดํฐ
๋ง์ฝ start_date๊ฐ 2023๋
1์ 1์ผ์ด์ง๋ง ์คํํ๋ ์์ (web ui์์ ํ ๊ธ ๋ฒํผ์ ํ์ฑํํ๋ ์์ )์ด 2023๋
8์ 23์ผ์ด๋ผ๋ฉด ์ด๋ป๊ฒ ๋ ๊น์? (catchup์ false๋ผ๊ณ ๊ฐ์ ) ๋ฐ๋ก ์คํ๋๋ ๊ฒ์ด ์๋๋ผ
โข
์ฒ์ DAG๊ฐ ์คํ๋๋ ์์ ์ 2023๋
8์ 24์ผ 00์ 01๋ถ
โข
์ฒ์ ์ฝ์ด์ค๋ ๋ฐ์ดํฐ๋ 2023๋
8์ 23์ผ์ ๋ฐ์ดํฐ
DAG๊ฐ ์์๋๋ ๊ธฐ์ค ์์ . ๋ฐฐ์น ๊ฐ๊ฒฉ๋งํผ ๊ณผ๊ฑฐ๋ฅผ ์ฝ์ด์ค๋ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ start_date๋ฅผ ์์์ผ๋ก ๋ฐฐ์น * n ์๊ฐ์ด ํ๋ฌ๊ฐ ์์ ๋ถํฐ ๋ฐฐ์น * (n-1) ์ DAG ๋ฐฐ์น๊ฐ ๋์๊ฐ ๊ฒ์
๋๋ค.
ํ์ฌ ์๊ฐ์ด start_date๋ณด๋ค ์ด์ ์ด๋ฉด DAG๊ฐ ์คํ๋์ง ์์ต๋๋ค. DAG๋ฅผ ๋ง๋ค๊ณ ์ง๊ธ ๋ฐ๋ก ๋๋ ค๋ณด๊ณ ์ถ์ผ๋ฉด start_date๋ฅผ ์ต์ ํ๋ฃจ ์ ์ผ๋ก ์ก์์ค์ผ ํฉ๋๋ค.
execution_date
์์ดํ๋ก์ฐ๊ฐ ํน์ ๋ฐฐ์น์์ ์ฒ๋ฆฌํด์ผ ํ ๋ฐ์ดํฐ์ ์๊ฐ ๋ฒ์ ์ค ๊ฐ์ฅ ๋น ๋ฅธ ์์ ์
๋๋ค. execution_date๋ ์ค์ ์ฒ๋ฆฌ์ ์ฌ์ฉ๋๊ธฐ ๋ณด๋ค๋ dag run ์ธ์คํด์ค, task ์ธ์คํด์ค๋ฅผ ๊ตฌ๋ถํ๋ ID๊ฐ ๋๋ค. (dag run ์ธ์คํด์ค์ task ์ธ์คํด์ค ๊ฐ์ ๊ด๊ณ ๊ธฐ์ค์ด ๋๊ธฐ๋ ํฉ๋๋ค)
+ catchup
โข
DAG๊ฐ ์ฒ์ ํ์ฑํ๋ ์์ ์ด start_date๋ณด๋ค ๋ฏธ๋ ์์ ์ผ ๊ฒฝ์ฐ
โข
start_date์ ์ฒ์ ์คํ๋ ์๊ฐ ์ฌ์ด, ๋ฐฐ์น๊ฐ ๋์ง ์์ ์์ ๋ค์ ๋ํด์ ์ด๋ป๊ฒ ํ ๊ฒ์ธ์ง ๊ฒฐ์ ํด์ฃผ๋ ํ๋ผ๋ฏธํฐ
โฆ
True: ์คํ๋์ง ์์ ๊ฒ๋ค ๋ชจ๋ ์คํ
โฆ
False: ์คํํ์ง ์๊ณ ๋
๋
โข
metadata db์ ์คํ ์ฌ๋ถ ๊ธฐ๋ก์ด ๋จ๊ธฐ ๋๋ฌธ์ docker compose down ํ๋ค๊ฐ up๋์ด๋ ๋ค์ ์ฝ์ด์ค๋ ์ฃผ์ํ์ฌ ํ๋ผ๋ฏธํฐ ์ฌ์ฉ
Airflow ์ค์ผ์ฅด๋ง - New: data_interval
์ผ๋ถ ์ฌ๋๋ค์ด ์๋ ๊ฑฐ์ ๋๋ค์์ ์ฌ๋๋ค์ด execution_date๋ฅผ ๋งค์ฐ ํท๊ฐ๋ คํ๊ธฐ ๋๋ฌธ์,,, 2021๋
12์ 2์ผ Airflow 2.2์์ data_interval์ด๋ผ๋ ์๋ก์ด ๊ฐ๋
์ด ๋์
๋ฉ๋๋ค.
๋ชจ๋๊ฐ too confusing์ด๋ผ๊ณ ์ธ์ณค์โฆ
์ด์ ๋ ์กฐ๊ธ ๋ ์ง๊ด์ ์ผ๋ก ์๋์ ๊ฐ์ด ์ค์ ํ ์ ์์ต๋๋ค
โข
์ค์ ์์ดํ๋ก์ฐ๊ฐ ์ฝ์ด์ค๋ ์์ ์์ ์ data_interval_start
โข
์์ดํ๋ก์ฐ๊ฐ ์ธ์ ์ฝ์ด์ค๋ ๊ฒ์ ๋ง๋ฌด๋ฆฌํด์ผ ํ๋์ง์ ๋ํ ์์ ์ data_interval_end
๋ํ ์ค์ ์์
์ด ์คํ๋๋ ์์ ์ ํํํ๋๋ฐ ์์ด์๋ logical_date๋ผ๊ณ ํํํ๊ธฐ ์์ํ์์ต๋๋ค. (execution์ด๋ผ๋ ๋จ์ด ์์ฒด๊ฐ ํท๊ฐ๋ฆฌ๊ฒ ํ๋๋ฐ ํฐ ๋น์ค์ ์ฐจ์งํ๊ธด ํ์,,,)
์ฆ, ์์
์ดย ์คํ๋๋ ์์ ์ย logical_date๋ก ํํํ๊ณ , ํด๋น ์์ ์ ์คํ๋๋ย ์์
์ด ์ฐธ๊ณ ํด์ผ ํ ๋ฐ์ดํฐ์ ๋ ์ง ๋ฒ์๋ย data_interval_start์ย data_interval_end๋ก ๊ฐ๊ฐ ๋๋์ด ํํํ ์ ์๊ฒ ๋์์ต๋๋ค
๊ธฐ์กด์ ์ฌ์ฉ๋๋ ๋งคํฌ๋ก/๋ณ์๋ค๊ณผ ์๋ก ๋ณ๊ฒฝ๋๋ ๋ณ์๋ค์ ํ๋ก ์ ๋ฆฌํ๋ฉด ์๋์ ๊ฐ์ต๋๋ค.
๊ธฐ์กด ์ด๋ฆ | ์๋ก์ด ์ด๋ฆ |
execution_date | data_interval_start, logical_date |
next_execution_date | data_interval_end |
tomorrow_ds | deprecate |
tomorrow_ds_nodash | deprecate |
yesterday_ds | deprecate |
yesterday_ds_nodash | deprecate |
prev_execution_date | prev_logical_date |
next_execution_date | next_logical_date |
๊ทธ ์ธ์๋ย execution_date๋ Airflow ์ค์ผ์ค๋ง์ ํต์ฌ ๊ฐ๋
์ด๊ธฐ ๋๋ฌธ์, Airflow Metadata DB์์๋ ์ด๋ฏธ ํญ๋๊ฒ ์ฌ์ฉ๋๊ณ ์์์ต๋๋ค. ์ด์ ๋ฐ๋ผ ๋ณ๊ฒฝ๋๋ DBํ
์ด๋ธ ๊ตฌ์กฐ์ ๋ณ๊ฒฝ ๋ฑ ๋ ์์ธํ ์ฌํญ์ ํ์ธํ๋ ค๋ฉดย AIP-39 ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํด ์ฃผ์ธ์.
Airflow ์ค์ผ์ฅด๋ง - New: Timetable
์์ ํ์ฌย schedule_interval์ ํฌ๋ก ํํ์(Cron expression)๊ณผย timedelta๋ก๋ง ํํํ ์ ์๊ธฐ ๋๋ฌธ์ย '๊ณตํด์ผ์ ์ ์ธํ ์์
์ผ์๋ง ์์
'ย ๊ณผ ๊ฐ์ด ์ค์ผ์ค ์ค๊ฐ์ค๊ฐ์ ๊ตฌ๋ฉ์ด ๋ซ๋ ค ์๋ ๋ณต์กํ ํํ์ ์ค์ผ์ค์ ์ ์ํ ์ ์์์ต๋๋ค.
ํ์ง๋ง Airflow 2.2๋ฅผ ๊ธฐ์ ์ผ๋กย Timetable(https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html)์ด๋ผ๋ ์๋ก์ด ํด๋์ค๊ฐ ๋์
๋์์ต๋๋ค. ์์ธํ ์ฌ์ฉ ๋ฐฉ๋ฒ ๊ด๋ จํด์๋ ์๋ ์๋ฃ๋ฅผ ์ฐธ๊ณ ํด์ฃผ์ธ์