๐Ÿฅž BE
home

Airflow Basic

Date
2023/10/23
Category
Data Engineering
Tag
Apache Airflow
Detail
Boaz Base Session

Airflow?

Apache Airflow๋Š” ๋ฐฐ์น˜ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ฐœ๋ฐœ, ์Šค์ผ€์ฅด๋ง, ๋ชจ๋‹ˆํ„ฐ๋งํ•˜๊ธฐ ์œ„ํ•œ ์˜คํ”ˆ์†Œ์Šค ํ”Œ๋žซํผ์ž…๋‹ˆ๋‹ค. Python ์ฝ”๋“œ๋ฅผ ์ด์šฉํ•ด DAG(Directed Acyclic Graph, Task์˜ ์ง‘ํ•ฉ)๋ฅผ ์ •์˜ํ•˜๊ณ , ์ด๋ฅผ ํ†ตํ•ด ๋ฐฐ์น˜ ์ž‘์—…์„ ๊ฐœ๋ฐœ, ์Šค์ผ€์ค„๋ง, ๋ชจ๋‹ˆํ„ฐ๋ง ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•„์š”ํ•œ ๊ฒฝ์šฐ์—๋Š” DAG ๊ฐ„์˜ ์˜์กด์„ฑ์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ์–ด ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค.
์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง์—์„œ ๋‹ค๋ค„์ง€๋Š” ์ด์œ ๋Š” ๋ฌด์—‡์ผ๊นŒ์š”? ์‹ค์ œ (๋ฐฐ์น˜์„ฑ) ETL(ํ˜น์€ ELT) ์ž‘์—…์€ ๋งค์šฐ ๋ณต์žกํ•œ ์ˆœ์„œ์™€ ์˜์กด ๊ด€๊ณ„ ์†์—์„œ ์ง„ํ–‰๋ฉ๋‹ˆ๋‹ค. ์—์–ดํ”Œ๋กœ์šฐ๋Š” ํŒŒ์ด์ฌ ์ฝ”๋“œ๋กœ ETL์˜ ์„ธ๋ถ€ ๊ณผ์ • ํ•˜๋‚˜ํ•˜๋‚˜๊ฐ€ ๊ฐ๊ธฐ ์–ด๋Š ์˜์กด์„ฑ์„ ๊ฐ€์ง€๊ณ , ์–ธ์ œ๋ถ€ํ„ฐ ์–ธ์ œ๊นŒ์ง€ ์‹คํ–‰๋˜์–ด์•ผ ํ• ์ง€๋ฅผ ์ปค์Šคํ„ฐ๋งˆ์ด์ฆˆํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. (์ด๋ฏธ ๋งŒ๋“ค์–ด์ง„ ์—ฌ๋Ÿฌ ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ๋‹ค๊ฐ€ ์“ธ ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ๋„ ํฐ ์žฅ์ )
๋˜ํ•œ ETL ํ”„๋กœ์„ธ์Šค๋ฅผ ๋กœ๊น…, ๋ชจ๋‹ˆํ„ฐ๋งํ•ด์ฃผ๊ธฐ ๋•Œ๋ฌธ์— ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋Š” ์›Œํฌํ”Œ๋กœ์šฐ๊ฐ€ ์–ธ์ œ ์–ด๋–ป๊ฒŒ ์‹คํŒจํ•˜์˜€๋Š”์ง€ ์ •ํ™•ํ•˜๊ฒŒ ์ธ์ง€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‚˜์•„๊ฐ€ ์‹คํŒจํ•œ ํŒŒํŠธ๋ฅผ ์žฌ์‹คํ–‰ํ•˜๋Š” backfill ๊ธฐ๋Šฅ์„ ์ง€์›ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.

๋ณธ๊ฒฉ์ ์ธ ์„ค๋ช… ์ „ ์šฉ์–ด ์„ค๋ช…

์›Œํฌํ”Œ๋กœ์šฐ(Workflow)
์›Œํฌํ”Œ๋กœ์šฐ๋Š” ์—ฐ์†์ ์ธ ์ž‘์—…์ด๋‚˜ ๋‹จ๊ณ„๋ฅผ ํ†ตํ•ด ํŠน์ • ๋ชฉํ‘œ๋ฅผ ๋‹ฌ์„ฑํ•˜๊ธฐ ์œ„ํ•œ ์ผ๋ จ์˜ ๊ณผ์ •์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง์—์„œ ์›Œํฌํ”Œ๋กœ์šฐ๋Š” ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘, ๋ณ€ํ™˜, ์ €์žฅ ๋“ฑ์˜ ์ž‘์—…๋“ค์„ ์กฐํ•ฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๊ณผ์ •์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๋ ค๋ฉด ์ž‘์—…๋“ค ๊ฐ„์˜ ์˜์กด์„ฑ, ์‹คํ–‰ ์ˆœ์„œ, ์Šค์ผ€์ค„๋ง ๋“ฑ์„ ์ •ํ™•ํ•˜๊ฒŒ ์„ค์ •ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
์‹ฌํ™”: ์Šค์ผ€์ฅด๋Ÿฌ vs ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ดํ„ฐ vs ์›Œํฌํ”Œ๋กœ์šฐ ๊ด€๋ฆฌ
๋Œ€๊ทธ(DAG - Directed Acyclic Graph)
DAG๋Š” "์œ ํ–ฅ ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„(Directed Acyclic Graph)"์˜ ์•ฝ์ž๋กœ, ์ž‘์—…๋“ค ๊ฐ„์˜ ์˜์กด ๊ด€๊ณ„๋ฅผ ๊ทธ๋ž˜ํ”„๋กœ ํ‘œํ˜„ํ•œ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง์—์„œ๋Š” DAG๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์›Œํฌํ”Œ๋กœ์šฐ์˜ ์ž‘์—…๋“ค๊ณผ ๊ฐ ์ž‘์—…๋“ค ๊ฐ„์˜ ์‹คํ–‰ ์ˆœ์„œ ๋ฐ ์˜์กด์„ฑ์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.
DAG ์˜ˆ์‹œ
๊ฐ ์ž‘์—…์€ DAG ๋‚ด์—์„œ ๋…ธ๋“œ(node)๋กœ ํ‘œํ˜„๋˜๋ฉฐ, ์ž‘์—…๋“ค ์‚ฌ์ด์˜ ์˜์กด์„ฑ์€ ๊ฐ„์„ (edge)์œผ๋กœ ํ‘œํ˜„๋ฉ๋‹ˆ๋‹ค. DAG์˜ ์ค‘์š”ํ•œ ํŠน์„ฑ์€ ๋น„์ˆœํ™˜์„ฑ(์‚ฌ์ดํด์ด ์—†์Œ)์ธ๋ฐ, ์ด๋Š” ๋ชจ๋“  ์ž‘์—…๋“ค์ด ์ˆœํ™˜ ์—†์ด ์‹คํ–‰๋˜๋Š” ๊ฒƒ์„ ๋ณด์žฅํ•ฉ๋‹ˆ๋‹ค.
์˜คํผ๋ ˆ์ดํ„ฐ(Operator)
Operator๋Š” ํƒœ์Šคํฌ(Task)๋ฅผ ์–ด๋–ป๊ฒŒ ์‹คํ–‰์‹œํ‚ฌ์ง€์— ๋Œ€ํ•ด ์ž‘์„ฑํ•ด๋‘” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์—ฌ๋Ÿฌ ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ์—ฎ์–ด ํ•˜๋‚˜์˜ DAG๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์—์–ดํ”Œ๋กœ์šฐ์—๋Š” ์—ฌ๋Ÿฌ ์˜คํผ๋ ˆ์ดํ„ฐ๊ฐ€ ์žˆ์œผ๋ฉฐ, ๋ฏธ๋ฆฌ ๋งŒ๋“ค์–ด์ง„ ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€ ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.
โ€ข
PythonOperator : Python ํ•จ์ˆ˜๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ
โ€ข
BashOperator : Bash ์ปค๋งจ๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ
โ€ข
EmailOperator : ์ด๋ฉ”์ผ์„ ์ „์†กํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ
โ€ข
SimpleHttpOperator : HTTP Request๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ
โ€ข
Provider Packages ์•ˆ์— ์™ธ๋ถ€ ํด๋ผ์šฐ๋“œ(AWS, GCP ๋“ฑ) ์ œํ’ˆ๊ณผ ์—์–ดํ”Œ๋กœ์šฐ๋ฅผ ์—ฐ๋™ํ•˜์—ฌ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ๋“ค๋„ ์กด์žฌ
ํƒœ์Šคํฌ(Task)
Task๋Š” ์ž‘์—… ๋‹จ์œ„๋กœ ์›Œํฌํ”Œ๋กœ์šฐ ๋‚ด์—์„œ ์‹คํ–‰๋˜๋Š” ๋‹จ์œ„ ์ž‘์—…์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค. Task๋Š” ์‹ค์ œ๋กœ ๋ฐ์ดํ„ฐ ์ถ”์ถœ, ๋ณ€ํ™˜, ์ €์žฅ ๋“ฑ์˜ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋ฉฐ, ๊ฐ๊ฐ์˜ Task๋Š” ์›Œํฌํ”Œ๋กœ์šฐ์˜ ๋‹จ๊ณ„๋ฅผ ๋‚˜ํƒ€๋‚ด๊ฑฐ๋‚˜ ํŠน์ • ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. Airflow์—์„œ Task๋Š” ์ž‘์—…์˜ ์ตœ์†Œ ์‹คํ–‰ ๋‹จ์œ„์ด๋ฉฐ, DAG ๋‚ด์—์„œ ์ •์˜๋œ ์ž‘์—…๋“ค ๊ฐ„์˜ ์˜์กด์„ฑ ๋ฐ ์‹คํ–‰ ์ˆœ์„œ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ์ค‘์š”ํ•œ ์š”์†Œ์ž…๋‹ˆ๋‹ค.
์‰ฝ๊ฒŒ ๋งํ•ด DAG๋Š” ์—ฌ๋Ÿฌ Operator๋กœ ์ด๋ฃจ์–ด์ ธ์žˆ์œผ๋ฉฐ, ์ด Operator๊ฐ€ ์‹ค์ œ ์ธ์Šคํ„ด์Šคํ™”๋˜์–ด ์‹คํ–‰๋˜๋Š” ๊ฒƒ์ด Task์ž…๋‹ˆ๋‹ค.
์šฉ์–ด๊ฐ€ ์ข€ ํ—ท๊ฐˆ๋ ค๋„ ๊ดœ์ฐฎ์Šต๋‹ˆ๋‹ค. ์ฝ”๋“œ๋ณด๋ฉด ์œ„ ๊ฐœ๋…๋“ค ํ•œ ๋ฒˆ์— ์ดํ•ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค

์‹ค์Šต 1: ์šฐ์„  ๋„์›Œ๋ด…์‹œ๋‹ค

๋กœ์ปฌ์ด๋‚˜ Docker-Compose ๋‘˜ ์ค‘ ํ•˜๋‚˜๋งŒ ํ•ด๋„ ๋ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์›ํ™œํ•œ ์‹ค์Šต์„ ์œ„ํ•ด์„œ docker compose ๋ฒ„์ „์œผ๋กœ ํ•ด์ฃผ์„ธ์š”. (๋กœ์ปฌ ๋ฒ„์ „์—์„œ๋Š” ์‹ค์Šต 2๊ฐ€ ์•ˆ ๋Œ์•„๊ฐ‘๋‹ˆ๋‹ค..!)
Docker-Compose๋Š” CeleryExecutor + Redis + PostgreSQL ๋ฒ„์ „, ๋กœ์ปฌ์€ LockerExecutor + sqlite ๋ฒ„์ „์ž…๋‹ˆ๋‹ค

Docker-Compose ๋ฒ„์ „

๋กœ์ปฌ ๋ฒ„์ „

Airflow ๊ตฌ์„ฑ

DAG Directory

โ€ข
airflow.cfg ํŒŒ์ผ์—์„œ dags_folder๋กœ ์„ค์ • ๊ฐ€๋Šฅ. ๊ธฐ๋ณธ์€ $AIRFLOW_HOME/dags
โ€ข
Airflow์—์„œ DAG Directory์— ์ƒˆ๋กœ ์ƒ๊ธด DAG ํŒŒ์ผ์„ ์–ด๋–ป๊ฒŒ ์ธ์ง€ํ•  ์ˆ˜ ์žˆ์„๊นŒ?
โ—ฆ
Scheduler์—์„œ dags_folder๋กœ ์ง€์ •๋œ ํด๋”, ๊ทธ๋ฆฌ๊ณ  ๊ทธ ํ•˜์œ„ ํด๋”์— ์žˆ๋Š” ๋ชจ๋“  ํŒŒ์ด์ฌ ํŒŒ์ผ์„ ์‹คํ–‰ํ•ด๋ณด๊ณ  DAG ๋ชจ๋“ˆ์ด import ๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธํ•จ
โ—ฆ
airflow.cfg์—์„œ dags_dir_list_interval์—์„œ ์Šค์บ” ์ฃผ๊ธฐ ์„ค์ • ๊ฐ€๋Šฅ (๊ธฐ๋ณธ์€ 300์ดˆ)
โ—ฆ
์ œ๋Œ€๋กœ ๋œ DAG์— ๋Œ€ํ•ด์„œ๋Š” ์–ด๋–ค DAG์™€ Task๊ฐ€ ์žˆ๋Š”์ง€ ์•Œ๊ณ  ๋๋‚˜์ง€๋งŒ, ์ผ๋ฐ˜ python ์Šคํฌ๋ฆฝํŠธ์˜ ๊ฒฝ์šฐ ๊ทธ๋Œ€๋กœ ์‹คํ–‰ํ•ด๋ฒ„๋ฆฌ๋Š” ๊ฒƒ. ์ •ํ™•ํžˆ๋Š” dag๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ python ์Šคํฌ๋ฆฝํŠธ์˜ main ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•œ๋‹ค๋Š” ์˜๋ฏธ. ์—ฌ๋Ÿฌ python ์Šคํฌ๋ฆฝํŠธ ์ค‘ dag ํŒŒ์ผ์ธ ๊ฒƒ์„ airflow์— ์ถ”๊ฐ€ํ•ด๋‘”๋‹ค.
์ด๋Ÿฐ ๋™์ž‘ ๋งค์ปค๋‹ˆ์ฆ˜ ๋•Œ๋ฌธ์— ์—ฌ๋Ÿฌ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒ. ํ…Œ์ŠคํŠธ ์ฝ”๋“œ ๋“ฑ์„ ๋„ฃ์–ด๋†“์•„๋„ ์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ์‹คํ–‰์‹œํ‚จ๋‹คโ€ฆ ํŠน์ • ํ…Œ์ด๋ธ” ๋‚ ๋ฆฌ๋Š” ํ…Œ์ŠคํŠธ ์ฝ”๋“œ๊ฐ€ ์žˆ๋‹ค๋ฉด ๊ทธ๊ฒŒ ํŠน์ • ์ฃผ๊ธฐ๋งˆ๋‹ค ๊ณ„์† ์‹คํ–‰๋˜๋Š” ๊ฒƒ. ํ•ด๊ฒฐ์ฑ…์€ dags ํด๋” ์•ˆ์— .airflowignore ํŒŒ์ผ์„ ๋งŒ๋“ค์–ด์„œ ๊ฑฐ๊ธฐ์— ํ…Œ์ŠคํŠธ ์ฝ”๋“œ ๋„ฃ์–ด๋†“๊ธฐ

Scheduler

โ€ข
๊ฐ์ข… ๋ฉ”ํƒ€ ์ •๋ณด์˜ ๊ธฐ๋ก์„ ๋‹ด๋‹น
โ€ข
DAG Directory ๋‚ด .py ํŒŒ์ผ์—์„œ DAG๋ฅผ ํŒŒ์‹ฑํ•˜์—ฌ DB์— ์ €์žฅ
โ€ข
DAG๋“ค์˜ ์Šค์ผ€์ฅด๋ง ๊ด€๋ฆฌ ๋ฐ ๋‹ด๋‹น
โ€ข
์‹คํ–‰ ์ง„ํ–‰ ์ƒํ™ฉ๊ณผ ๊ฒฐ๊ณผ๋ฅผ DB์— ์ €์žฅ
โ€ข
Executor๋ฅผ ํ†ตํ•ด ์‹ค์ œ๋กœ ์Šค์ผ€์ฅด๋ง๋œ DAG๋ฅผ ์‹คํ–‰

Scheduler - Executor

โ€ข
Local Executor
โ—ฆ
DAG Run์„ ํ”„๋กœ์„ธ์Šค ๋‹จ์œ„๋กœ ์‹คํ–‰
โ–ช
Local Executor
โ€ข
ํ•˜๋‚˜์˜ DAG Run์„ ํ•˜๋‚˜์˜ ํ”„๋กœ์„ธ์Šค๋กœ ๋„์›Œ์„œ ์‹คํ–‰
โ€ข
์ตœ๋Œ€๋กœ ์ƒ์„ฑํ•  ํ”„๋กœ๋ ˆ์Šค ์ˆ˜๋ฅผ ์ •ํ•˜์—ฌ ์‚ฌ์šฉ
โ–ช
Sequential Executor
โ€ข
ํ•˜๋‚˜์˜ ํ”„๋กœ์„ธ์Šค์—์„œ ๋ชจ๋“  DAG Run๋“ค์„ ์ฒ˜๋ฆฌ
โ€ข
Airflow์˜ ๊ธฐ๋ณธ Executor
โ€ข
Remote Executor
โ—ฆ
DAG Run์„ ์™ธ๋ถ€ ํ”„๋กœ์„ธ์Šค๋กœ ์‹คํ–‰
โ–ช
Celery Executor
โ€ข
DAG Run์„ Celery Worker Process๋กœ ์‹คํ–‰
โ–ช
Kubernetes Executor
โ€ข
DAG Run ํ•˜๋‚˜๊ฐ€ Pod (== k8s ์œ„์—์„œ ์—์–ดํ”Œ๋กœ์šฐ ์‚ฌ์šฉ ์‹œ ์ด์šฉํ•˜๋Š” executor)

Metadata DB

โ€ข
๋ฉ”ํƒ€ ์ •๋ณด๋ฅผ ์ €์žฅํ•˜๋Š” ๊ณณ
โ€ข
Scheduler์— ์˜ํ•ด ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๊ฐ€ ์Œ“์ž„
โ€ข
Sqlite๊ฐ€ ๊ธฐ๋ณธ์ด๋‚˜ ๋ณดํ†ต MySQL์ด๋‚˜ Postgres๋ฅผ ์‚ฌ์šฉ
โ€ข
ํŒŒ์‹ฑํ•œ DAG ์ •๋ณด, DAG Run ์ƒํƒœ์™€ ์‹คํ–‰ ๋‚ด์šฉ, Task ์ •๋ณด ๋“ฑ์„ ์ €์žฅ
โ€ข
User์™€ Role์— ๋Œ€ํ•œ ์ •๋ณด ์ €์žฅ
Metadata DB์˜ ๋‚ด์šฉ์„ fernet_key๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์•”ํ˜ธํ™” ๊ฐ€๋Šฅ
โ€ข
fernet_key๋ฅผ ์„ธํŒ… ์•ˆ ํ•˜๋ฉด ์•”ํ˜ธํ™”๋˜์ง€ ์•Š๊ณ  ํ”Œ๋ ˆ์ธ ํ…์ŠคํŠธ๊ฐ€ ์ €์žฅ
โ€ข
๊ทธ๋Ÿฐ๋ฐ fernet_key๋ฅผ ์žƒ์–ด๋ฒ„๋ฆฌ๋ฉด db ๋ณต๊ตฌ๋ฅผ ๋ชปํ•จ;;;

Worker

โ€ข
DAG๋ฅผ ์‹ค์ œ๋กœ ์‹คํ–‰์‹œํ‚ด
โ—ฆ
Scheduler์— ์˜ํ•ด ์ƒ๊ธฐ๊ณ  ์‹คํ–‰๋จ
โ—ฆ
Executor์— ๋”ฐ๋ผ Worker์˜ ํ˜•ํƒœ๊ฐ€ ๋‹ค๋ฆ„
โ–ช
Celery ํ˜น์€ Local Executor์˜ ๊ฒฝ์šฐ Worker๋Š” ํ”„๋กœ์„ธ์Šค
โ–ช
K8S Executor์˜ ๊ฒฝ์šฐ Worker๋Š” ํŒŒ๋“œ
โ—ฆ
DAG Run์„ ์‹คํ–‰ํ•˜๋Š” ๊ณผ์ •์—์„œ ์ƒ๊ธด ๋กœ๊ทธ๋ฅผ ์ €์žฅ

Webserver

โ€ข
Web UI๋ฅผ ๋‹ด๋‹น (flask๋กœ ๊ตฌํ˜„)
โ€ข
REST API๋ฅผ ์ œ๊ณตํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๊ผญ Web UI๋ฅผ ํ†ตํ•˜์ง€ ์•Š์•„๋„ ํ†ต์‹  ๊ฐ€๋Šฅ
โ€ข
๋ณดํ†ต ์ด ์›น์„œ๋ฒ„๋ฅผ ์ด์šฉํ•˜์—ฌ DAG๋ฅผ ON/OFF, ๋ชจ๋‹ˆํ„ฐ๋ง ํ•จ
โ€ข
Metadata DB์™€ ํ†ต์‹ ํ•˜๋ฉฐ ์œ ์ €์—๊ฒŒ ํ•„์š”ํ•œ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋ฅผ ์›น ๋ธŒ๋ผ์šฐ์ €๋กœ ๋ณด์—ฌ์ฃผ๊ณ  ์‹œ๊ฐํ™” ํ•จ

์‹ค์Šต 2: DAG ๊ฐœ๋ฐœ ์ฐ๋จนํ•ด๋ณด๊ธฐ

์‹ค์Šต ์ „ - ๊ฐœ๋… ์กฐ๊ธˆ ๋”

DAG Parameter (not task parameter)
dag = DAG( dag_id = "helloWorld", start_date = datetime(2022,6,15), #์›๋ž˜ ์‹œ์ž‘ํ–ˆ์–ด์•ผ ํ•œ ๋‚ ์งœ ํ•˜์ง€๋งŒ ๋ช…์‹œ์ ์œผ๋กœ ์‹œ์ž‘ ์•ˆ ํ•ด์ฃผ๋ฉด dag๋Š” ์‹œ์ž‘ ์•ˆ ํ•จ catchup=False, #start_date์™€ ์‹ค์ œ ์‹œ์ž‘์ผ ์‚ฌ์ด์— ์‹คํ–‰ ์•ˆ ๋œ ๊ฒƒ์„ ํ•œ ๋ฒˆ์— ์‹คํ–‰ํ•ด์ฃผ๋Š” ์˜ต์…˜ (๋ชจ๋ฅด๊ฒ ์œผ๋ฉด falseโ€ฆ) tags=['example'], #ํŒ€๋ณ„๋กœ, ํƒœ์Šคํฌ๋ณ„๋กœ tag ๋ถ™์ผ ์ˆ˜ ์žˆ์Œ (DAG ์ˆ˜๋ฐฑ๊ฐœ ๋˜๋ฉด ํ•˜๋‚˜ํ•˜๋‚˜ ๊ธฐ์–ตํ•  ์ˆ˜ ์—†์Œ. tag ํ•„์ˆ˜) schedule = '0 2 * * *', #cron์œผ๋กœ ํ‘œํ˜„ default_args={ #dag ๋ฐ‘์— ์†ํ•œ task์— ์ ์šฉ๋˜๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ 'on_failure_callback': slack.on_failure_callback, 'retries': 1, 'retry_delay': timedelta(minutes=1) } )
Python
๋ณต์‚ฌ
โ€ข
max_active_runs: DAG ์ธ์Šคํ„ด์Šค๊ฐ€ ํ•œ ๋ฒˆ์— ๋ช‡ ๊ฐœ๊นŒ์ง€ ๋Œ์•„๊ฐˆ ์ˆ˜ ์žˆ๋Š”๊ฐ€
โ€ข
max_active_tasks: ํƒœ์Šคํฌ๋“ค์ด ํ•œ ๋ฒˆ์— ๋ช‡ ๊ฐœ๊นŒ์ง€ ๋Œ์•„๊ฐˆ ์ˆ˜ ์žˆ๋Š”๊ฐ€
โ‡’ ๋‘˜์€ worker์˜ CPU ์ฝ”์–ด ์ˆ˜์— ์˜ํ•ด upper limit์ด ๊ฑธ๋ฆผ (๋งŽ์ด ์„ค์ •ํ•ด๋†”๋„ ๋™์‹œ์— ๋Œ์•„๊ฐˆ ์ˆ˜ ์žˆ๋Š” ํšŸ์ˆ˜๋Š” ์ œํ•œ)
โ€ข
catchup: whether to backfill pasts runs
โ€ข
DAG ํŒŒ๋ผ๋ฏธํ„ฐ์™€ Task ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐจ์ด์ ์„ ์ดํ•ดํ•ด์•ผ ํ•จ!
โ—ฆ
DAG ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” DAG ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค ๋•Œ ์ง€์ •ํ•ด์ค˜์•ผ ํ•จ
Task Parameters - default_arg
โ€ข
๋’ค์— DAG ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค ๋•Œ ์ง€์ •ํ•จ
โ€ข
ํŒŒ๋ผ๋ฏธํ„ฐ ์˜ˆ์‹œ:
โ—ฆ
owner: owner๊ฐ€ ๋ˆ„๊ตฌ์ธ์ง€
โ—ฆ
email: ์‹คํŒจํ–ˆ์„ ๋•Œ ๋ˆ„๊ตฌ์—๊ฒŒ ์ด๋ฉ”์ผ?
โ—ฆ
retries: ์‹คํŒจ์‹œ ์žฌ์‹คํ–‰ ํšŸ์ˆ˜
โ—ฆ
retry_delay: ์žฌ์‹คํ–‰ ๊ฐ„๊ฒฉ
โ—ฆ
on_failure_callback: ์‹คํŒจํ–ˆ์„ ๋•Œ ์‹คํ–‰ํ•  ํ•จ์ˆ˜ (๋ณดํ†ต ๋ˆ„๊ตฌ์—๊ฒŒ ์ด๋ฉ”์ผ ๋ณด๋‚ด๋ผ ์ด๋Ÿฐ ํ•จ์ˆ˜ ์‚ฌ์šฉํ•จ)
โ—ฆ
on_success_callbak: ์„ฑ๊ณตํ–ˆ์„ ๋•Œ ์‹คํ–‰ํ•  ํ•จ์ˆ˜

์‹ค์Šต ์ „ - ์ฝ”๋“œ ๊ตฌ๊ฒฝํ•ด๋ณด๊ธฐ

์šฐ์„  ์˜ˆ์‹œ๋ฅผ ๋ณด๋ฉด์„œ ๊ฐ์„ ์žก์•„๋ด…์‹œ๋‹ค
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime dag = DAG( dag_id = 'HelloWorld', start_date = datetime(2022,6,14), catchup=False, tags=['example'], schedule = '0 2 * * *') def print_hello(): print("hello!") return "hello!" def print_goodbye(): print("goodbye!") return "goodbye!" print_hello = PythonOperator( task_id = 'print_hello', #python_callable param points to the function you want to run python_callable = print_hello, #dag param points to the DAG that this task is a part of dag = dag) print_goodbye = PythonOperator( task_id = 'print_goodbye', python_callable = print_goodbye, dag = dag) #Assign the order of the tasks in our DAG print_hello >> print_goodbye
Python
๋ณต์‚ฌ
์กฐ๊ธˆ ๋” ๋””๋ฒจ๋กญ์‹œํ‚ค๋ฉด ์•„๋ž˜์™€ ๊ฐ™์€ ํ˜•ํƒœ๋กœ ๊ฐ„๋‹จํžˆ ํ‘œํ˜„ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค
from airflow import DAG from airflow.decorators import task from datetime import datetime @task def print_hello(): print("hello!") return "hello!" @task def print_goodbye(): print("goodbye!") return "goodbye!" with DAG( dag_id = 'HelloWorld_v2', start_date = datetime(2022,5,5), catchup=False, tags=['example'], schedule = '0 2 * * *' ) as dag: # Assign the tasks to the DAG in order print_hello() >> print_goodbye()
Python
๋ณต์‚ฌ

๋ณธ๊ฒฉ์ ์œผ๋กœ ์‹ค์Šตํ•ด๋ณด๊ธฐ

๋ณธ๊ฒฉ์ ์œผ๋กœ DAG ๊ฐœ๋ฐœ์„ ํ•ด๋ด…์‹œ๋‹ค.
์•„๋ž˜ ์ฝ”๋“œ๋Š” indeed์—์„œ airflow ๊ธฐ์ˆ  ์Šคํƒ์„ ์‚ฌ์šฉํ•˜๋Š” ์ฑ„์šฉ ๊ณต๊ณ ๋ฅผ ๊ฒ€์ƒ‰, title์„ ํฌ๋กค๋งํ•˜์—ฌ ์ €์žฅํ•˜๋Š” ์•„์ฃผ์•„์ฃผ ๊ฐ„๋‹จํ•œ ETL์„ airflow๋กœ ๋Œ๋ ค๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

๋ณธ๊ฒฉ์ ์œผ๋กœ ์‹ค์Šตํ•ด๋ณด๊ธฐ - ๋ฏผ๊ฐํ•œ ์ •๋ณด ๋งˆ์Šคํ‚น

Airflow์—์„œ๋Š” Connection๊ณผ Variables์„ ํ†ตํ•ด ๋ฏผ๊ฐํ•œ DB ํ˜น์€ ํด๋ผ์šฐ๋“œ Connection ์ •๋ณด, ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ๋งˆ์Šคํ‚นํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ์‹ค์Šต์—์„œ Connection์„ ํ™œ์šฉํ•ด Postgresql Connection ์ •๋ณด๋ฅผ, Variables๋ฅผ ํ™œ์šฉํ•ด ๋ฐ์ดํ„ฐ ์†Œ์Šค ๋งํฌ๋ฅผ ๋งˆ์Šคํ‚นํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.
1.
Airflow Webserver์—์„œ Admin์— Connections๋ฅผ ๋ˆ„๋ฅธ๋‹ค
โ€ข
+์„ ๋ˆŒ๋Ÿฌ ์ƒˆ๋กœ์šด ์ปค๋„ฅ์…˜์„ ๋“ฑ๋กํ•ด์ค€๋‹ค
3.
Connection Type์„ โ€˜Postgresโ€™๋กœ ๋ณ€๊ฒฝํ•˜๊ณ  ์ •๋ณด๋ฅผ ์ž…๋ ฅํ•œ๋‹ค
โ€ข
Connection Id: postgres_conn_id
โ€ข
Connection Type: Postgres
โ€ข
Host: ์ž์‹ ์˜ Postgres ์ปจํ…Œ์ด๋„ˆ ์ด๋ฆ„
โ€ข
Login: airflow
โ€ข
password: airflow
โ€ข
port: 5432
4.
Test ํ›„์— ์—ฐ๊ฒฐ์ด ๋œ๋‹ค๋ฉด Save๋ฅผ ๋ˆŒ๋Ÿฌ Connection์„ ๋“ฑ๋ก ์™„๋ฃŒํ•œ๋‹ค
5.
Admin์—์„œ Variables๋ฅผ ํด๋ฆญํ•œ๋‹ค
6.
Connection๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ +๋ฅผ ๋ˆŒ๋Ÿฌ ํ™˜๊ฒฝ ๋ณ€์ˆ˜๋ฅผ ์ƒˆ๋กœ ๋“ฑ๋กํ•œ๋‹ค
7.
Variables๋Š” ํ‚ค-๋ฐธ๋ฅ˜ ํ˜•์‹์œผ๋กœ ๋“ฑ๋ก ๊ฐ€๋Šฅํ•˜๋‹ค
โ€ข
Key: job_search_url
8.
Save ๋ฒ„ํŠผ์„ ๋ˆŒ๋Ÿฌ ๋“ฑ๋ก

๋ณธ๊ฒฉ์ ์œผ๋กœ ์‹ค์Šตํ•ด๋ณด๊ธฐ - DAG ์ž‘์„ฑํ•˜๊ธฐ

mkdir dags && cd dags
Python
๋ณต์‚ฌ
dags ํด๋”์— boaz_simple_etl.py ํŒŒ์ผ์„ ํ•˜๋‚˜ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค (vi ํ˜น์€ IDE ํ™œ์šฉ)
# boaz_simple_etl.py from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable import requests from bs4 import BeautifulSoup import csv # ์‚ฌ์šฉํ•  Operator Import from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator def extract(link): f = requests.get(link) return f.text def transform(text): soup = BeautifulSoup(text, 'html.parser') titles = [] tmp = soup.select(".title") for title in tmp: titles.append(title.get_text()) return titles def load(titles): with open("job_search_result.csv", "w") as file: writer = csv.writer(file) writer.writerow(titles) def etl(): link = "https://kr.indeed.com/jobs?q=airflow" text = extract(link) titles = transform(text) load(titles) # DAG ์„ค์ • dag = DAG( dag_id="boaz_simple_etl", schedule_interval="@daily", start_date=datetime(2022,6,15), #์›๋ž˜ ์‹œ์ž‘ํ–ˆ์–ด์•ผ ํ•œ ๋‚ ์งœ ํ•˜์ง€๋งŒ ๋ช…์‹œ์ ์œผ๋กœ ์‹œ์ž‘ ์•ˆ ํ•ด์ฃผ๋ฉด dag๋Š” ์‹œ์ž‘ ์•ˆ ํ•จ default_args = { 'retries': 3, 'retry_delay': timedelta(minutes=3), 'on_failure_callback': slack.on_failure_callback, # ์‹คํŒจ์‹œ SLACK ํ•จ์ˆ˜ ์š”์ฒญ }, #dag ๋ฐ‘์— ์†ํ•œ task์— ์ ์šฉ๋˜๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ tags=["BOAZ"], #ํŒ€๋ณ„๋กœ, ํƒœ์Šคํฌ๋ณ„๋กœ tag ๋ถ™์ผ ์ˆ˜ ์žˆ์Œ (DAG ์ˆ˜๋ฐฑ๊ฐœ ๋˜๋ฉด ํ•˜๋‚˜ํ•˜๋‚˜ ๊ธฐ์–ตํ•  ์ˆ˜ ์—†์Œ. tag ํ•„์ˆ˜) catchup=False #start_date์™€ ์‹ค์ œ ์‹œ์ž‘์ผ ์‚ฌ์ด์— ์‹คํ–‰ ์•ˆ ๋œ ๊ฒƒ์„ ํ•œ ๋ฒˆ์— ์‹คํ–‰ํ•ด์ฃผ๋Š” ์˜ต์…˜ (๋ชจ๋ฅด๊ฒ ์œผ๋ฉด falseโ€ฆ) ) # Operator ๋งŒ๋“ค๊ธฐ creating_table = PostgresOperator( task_id="creating_table", postgres_conn_id="postgres_conn_id", # ์›นUI์—์„œ connection์„ ๋“ฑ๋กํ•ด์ค˜์•ผ ํ•จ. sql=''' CREATE TABLE IF NOT EXISTS job_search_result ( title TEXT ) ''', dag = dag ) etl = PythonOperator( task_id = "etl", python_callable = etl, # ์‹คํ–‰ํ•  ํŒŒ์ด์ฌ ํ•จ์ˆ˜ dag = dag ) # ์ €์žฅ๋˜์—ˆ๋‹ค๋Š” ์‚ฌ์‹ค echo store_result = BashOperator( task_id="store_result", bash_command= "echo 'ETL Done'", dag=dag ) # ํŒŒ์ดํ”„๋ผ์ธ ๊ตฌ์„ฑํ•˜๊ธฐ creating_table >> etl >> store_result
Python
๋ณต์‚ฌ

Webserver์—์„œ DAG ์‹คํ–‰ํ•ด๋ณด๊ธฐ

dags ํด๋”์—์„œ dag๋ฅผ ์ž‘์„ฑํ•˜๊ฒŒ ๋˜๋ฉด scheduler๋Š” ์ด ํŒŒ์ผ์„ ํŒŒ์‹ฑํ•˜์—ฌ web ui์—์„œ๋„ ๋ณผ ์ˆ˜ ์žˆ๋„๋ก ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค. dag๋Š” ์ž‘์„ฑํ–ˆ๋‹ค๊ณ  ํ•ด์„œ ๋ฐ”๋กœ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์€ ์•„๋‹™๋‹ˆ๋‹ค. ์™ผ์ชฝ ํ† ๊ธ€ ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜์—ฌ DAG๋ฅผ ์‹คํ–‰์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
์‹คํ–‰ ํ›„ boaz_simple_etl์„ ํด๋ฆญํ•˜์—ฌ ๋“ค์–ด๊ฐ€๋ฉด ์•„๋ž˜์™€ ๊ฐ™์€ ํ™”๋ฉด์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐ ํƒœ์Šคํฌ์˜ ์ง„ํ–‰ ์ƒํ™ฉ๊ณผ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋“ค์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋•Œ ๊ฐ ํƒœ์Šคํฌ๊ฐ€ ์ดˆ๋ก์ƒ‰์ด๋ผ๋ฉด ์„ฑ๊ณต, ๋…ธ๋ž€์ƒ‰์ด๋ผ๋ฉด ์‹คํ–‰ ์ค‘, ๋นจ๊ฐ„์ƒ‰์ด๋ผ๋ฉด ์‹คํŒจํ–ˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. (Graph๋ฅผ ํด๋ฆญํ•˜์—ฌ ์›Œํฌํ”Œ๋กœ์šฐ ๊ณผ์ •์„ ์กฐ๊ธˆ ๋” ์ง๊ด€์ ์œผ๋กœ ๋ณผ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค)
๋งŒ์•ฝ ๊ณ„์†ํ•ด์„œ ์‹คํ–‰์ด ์ง€์†๋˜๊ฑฐ๋‚˜, ํƒœ์Šคํฌ ์‹คํ–‰์— ์‹คํŒจํ–ˆ๋‹ค๋ฉด ๊ฐ ํƒœ์Šคํฌ๋ฅผ ํด๋ฆญํ•˜์—ฌ log๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. DAG ๋””๋ฒ„๊น…์€ ์ด ์‹คํ–‰ ๋กœ๊ทธ๋“ค์„ ๋ณด๋ฉฐ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์‹ฌํ™” - Xcom์„ ํ™œ์šฉํ•œ ํƒœ์Šคํฌ๋ผ๋ฆฌ์˜ ๋ฐ์ดํ„ฐ ๊ตํ™˜

์‹ฌํ™” - Decorator ์‚ฌ์šฉํ•˜๊ธฐ

Airflow ์™€ ์‹œ๊ฐ„

์—์–ดํ”Œ๋กœ์šฐ๋Š” ์ฃผ์š” ๊ธฐ๋Šฅ ์ค‘ ํ•˜๋‚˜๋Š” ์Šค์ผ€์ฅด๋ง์ž…๋‹ˆ๋‹ค. ์ฆ‰, ์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ์‹œ๊ฐ„์„ ์–ด๋–ป๊ฒŒ ๋‹ค๋ฃจ๋Š”๊ฐ€๋Š” ๋งค์šฐ ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค (๊ทธ๋ฆฌ๊ณ  ๊ณจ๋•Œ๋ฆฝ๋‹ˆ๋‹ค)

Airflow์™€ ํƒ€์ž„์กด

airflow.cfg์—์„œ ํƒ€์ž„์กด ์„ค์ •์„ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ธฐ๋ณธ ํƒ€์ž„์กด์€ UTC๋ฅผ ๋”ฐ๋ฆ…๋‹ˆ๋‹ค.
โ€ข
default_timezone: start_date, end_date, schedule์˜ ํƒ€์ž„์กด์— ์˜ํ–ฅ
โ€ข
default_ui_timezone: web server ์šฐ์ธก ์ƒ๋‹จ์— ํƒ€์ž„์กด ์„ธํŒ…์— ๋ฐ˜์˜๋จ
ํ›„์—๋„ ์ด์•ผ๊ธฐํ•˜๊ฒ ์ง€๋งŒ ํƒ€์ž„์กด๊ณผ ์Šค์ผ€์ฅด๋ง์€ ๊ฝค ๊นŒ๋‹ค๋กœ์šด ์ž‘์—…์ด๊ธฐ ๋•Œ๋ฌธ์— KST๊ฐ€ ์•„๋‹Œ UTC๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ž‘์—…ํ•˜๋Š” ๊ฒƒ์ด ์ดˆ๋ณด์ž ์ž…์žฅ์—์„œ ์ •์‹ ๊ฑด๊ฐ•์— ์ด๋กญ์Šต๋‹ˆ๋‹ค

Airflow ์Šค์ผ€์ฅด๋ง - Legacy: start_date์™€ execution_date

์—์–ดํ”Œ๋กœ์šฐ๋Š” ์–ด๋–ค ์‹œ๊ฐ„์„ ๊ธฐ์ค€์œผ๋กœ ์–ธ์ œ ์–ด๋–ป๊ฒŒ ์Šค์ผ€์ฅด๋ง์ด ๋ ๊นŒ์š”? ๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ๋™์‹œ์— ๊ฐ€์žฅ ๋ฌธ์ œ์ ์ธ ๊ฐœ๋…์ธ start_date์™€ execution_date์— ๋Œ€ํ•ด์„œ ๊ฐ„๋‹จํžˆ ์•Œ์•„๋ด…์‹œ๋‹ค
1.
์›ํ™œํ•œ ์ดํ•ด๋ฅผ ์œ„ํ•ด DAG์˜ ์‹คํ–‰ ์ฃผ๊ธฐ start_date execution_date๋Š” ์„œ๋กœ ๋‹ค๋ฅธ ๊ฐœ๋…์ž„์„ ์šฐ์„  ์ˆ™์ง€ํ•ฉ๋‹ˆ๋‹ค.
2.
์—์–ดํ”Œ๋กœ์šฐ์˜ ์Šค์ผ€์ฅด๋ง ์ปจ์…‰์€ ์ผ ๋ฐฐ์น˜๋ฉด ํ•˜๋ฃจ ์ „ ๊ธฐ์ค€์œผ๋กœ ๋Œ๊ณ , ์‹œ๊ฐ„ ๋ฐฐ์น˜๋ฉด ์‹œ๊ฐ„ ์ „ ๊ธฐ์ค€์œผ๋กœ ๋„๋Š” ๊ฒƒ. ์ฆ‰, ๋ฐฐ์น˜ ๊ฐ„๊ฒฉ๋งŒํผ์˜ ๊ณผ๊ฑฐ๋ฅผ ์ฝ์–ด์˜ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.
3.
์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ์‹คํ–‰๋˜๋Š” ๊ฒƒ๊ณผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์™€์•ผ ํ•˜๋Š” ์‹œ์ ์„ ์ž˜ ๊ตฌ๋ถ„ํ•ด์„œ ์ƒ๊ฐํ•ฉ๋‹ˆ๋‹ค
์‰ฝ๊ฒŒ DAG์˜ ์‹คํ–‰ ์ฃผ๊ธฐ๋Š” ํ•˜๋ฃจ์ž„์„ ๊ฐ€์ •ํ•˜๊ณ  ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค. ๋งค์ผ 00:01์— ์ „๋‚  ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์™€์„œ ๊ณ„์‚ฐํ›„ ๋‹ค์‹œ ๋กœ๋“œํ•˜๋Š” DAG๋ผ๊ณ  ๊ฐ€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.
schedule_interval (DAG์˜ ์‹คํ–‰ ์ฃผ๊ธฐ)
โ€ข
default๋Š” None
โ€ข
schedule_interval="@daily" ์™€ ๊ฐ™์ด @daily ๋งคํฌ๋กœ๋Š” ๋งค์ผ ์ž์ •์— DAG๋ฅผ ์‹คํ–‰ํ•˜๋„๋ก ์˜ˆ์•ฝํ•ฉ๋‹ˆ๋‹ค. ์ด ์™ธ์—๋„ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋งคํฌ๋กœ๋ฅผ ์ง€์›
โ€ข
cron์„ ํ™œ์šฉํ•˜์—ฌ ์Šค์ผ€์ฅด๋ง ๊ฐ€๋Šฅ
โ€ข
timedelta ์ธ์Šคํ„ด์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋นˆ๋„ ๊ธฐ๋ฐ˜ ์Šค์ผ€์ฅด์„ ์ •์˜ ๊ฐ€๋Šฅ
ํ”„๋ฆฌ์…‹
์„ค๋ช…
@once
1ํšŒ๋งŒ ์‹คํ–‰ํ•˜๋„๋ก ์Šค์ผ€์ค„
@hourly
๋งค์‹œ๊ฐ„ ๋ณ€๊ฒฝ ์‹œ 1ํšŒ ์‹คํ–‰
@daily
๋งค์ผ ์ž์ •์— 1ํšŒ ์‹คํ–‰
@weekly
๋งค์ฃผ ์ผ์š”์ผ ์ž์ •์— 1ํšŒ ์‹คํ–‰
@monthly
๋งค์›” 1์ผ ์ž์ •์— 1ํšŒ ์‹คํ–‰
@yearly
๋งค๋…„ 1์›” 1์ผ ์ž์ •์— 1ํšŒ ์‹คํ–‰
start_date
DAG๊ฐ€ ์ฒ˜์Œ ์‹คํ–‰๋˜์–ด์•ผ ํ•˜๋Š” ๋‚ ์งœ๊ฐ€ ์•„๋‹Œ ์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ์–ด๋–ค ๋‚ ์งœ๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ฃผ๊ธฐ๋ฅผ ๊ณ„์‚ฐํ•ด์•ผ ํ•˜๋Š”์ง€์— ๋Œ€ํ•œ ์ •๋ณด์ž…๋‹ˆ๋‹ค. ์ด๊ฒŒ ๋ฌด์Šจ ๋ง์ด๋ƒ
start_date๊ฐ€ 2023๋…„ 1์›” 1์ผ๋กœ ์„ค์ •๋˜์–ด ์žˆ์„ ๋•Œ ๋ฐ์ผ๋ฆฌ ๋Œ€๊ทธ์˜ ์‹คํ–‰๊ณผ ์ฒ˜๋ฆฌ๋Š” ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.
โ€ข
์ฒ˜์Œ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์€ 2023๋…„ 1์›” 2์ผ 00์‹œ 01๋ถ„
โ€ข
์ฒ˜์Œ ์ฝ์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋Š” 2023๋…„ 1์›” 1์ผ์˜ ๋ฐ์ดํ„ฐ
๋งŒ์•ฝ start_date๊ฐ€ 2023๋…„ 1์›” 1์ผ์ด์ง€๋งŒ ์‹คํ–‰ํ•˜๋Š” ์‹œ์ (web ui์—์„œ ํ† ๊ธ€ ๋ฒ„ํŠผ์„ ํ™œ์„ฑํ™”ํ•˜๋Š” ์‹œ์ )์ด 2023๋…„ 8์›” 23์ผ์ด๋ผ๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ์š”? (catchup์€ false๋ผ๊ณ  ๊ฐ€์ •) ๋ฐ”๋กœ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ
โ€ข
์ฒ˜์Œ DAG๊ฐ€ ์‹คํ–‰๋˜๋Š” ์‹œ์ ์€ 2023๋…„ 8์›” 24์ผ 00์‹œ 01๋ถ„
โ€ข
์ฒ˜์Œ ์ฝ์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋Š” 2023๋…„ 8์›” 23์ผ์˜ ๋ฐ์ดํ„ฐ
DAG๊ฐ€ ์‹œ์ž‘๋˜๋Š” ๊ธฐ์ค€ ์‹œ์ . ๋ฐฐ์น˜ ๊ฐ„๊ฒฉ๋งŒํผ ๊ณผ๊ฑฐ๋ฅผ ์ฝ์–ด์˜ค๋Š” ๊ฒƒ์ด๊ธฐ ๋•Œ๋ฌธ์— start_date๋ฅผ ์‹œ์ž‘์œผ๋กœ ๋ฐฐ์น˜ * n ์‹œ๊ฐ„์ด ํ˜๋Ÿฌ๊ฐ„ ์‹œ์ ๋ถ€ํ„ฐ ๋ฐฐ์น˜ * (n-1) ์˜ DAG ๋ฐฐ์น˜๊ฐ€ ๋Œ์•„๊ฐˆ ๊ฒƒ์ž…๋‹ˆ๋‹ค.
ํ˜„์žฌ ์‹œ๊ฐ„์ด start_date๋ณด๋‹ค ์ด์ „์ด๋ฉด DAG๊ฐ€ ์‹คํ–‰๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. DAG๋ฅผ ๋งŒ๋“ค๊ณ  ์ง€๊ธˆ ๋ฐ”๋กœ ๋Œ๋ ค๋ณด๊ณ  ์‹ถ์œผ๋ฉด start_date๋ฅผ ์ตœ์†Œ ํ•˜๋ฃจ ์ „์œผ๋กœ ์žก์•„์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค.
execution_date
์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ํŠน์ • ๋ฐฐ์น˜์—์„œ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•  ๋ฐ์ดํ„ฐ์˜ ์‹œ๊ฐ„ ๋ฒ”์œ„ ์ค‘ ๊ฐ€์žฅ ๋น ๋ฅธ ์‹œ์ ์ž…๋‹ˆ๋‹ค. execution_date๋Š” ์‹ค์ œ ์ฒ˜๋ฆฌ์— ์‚ฌ์šฉ๋˜๊ธฐ ๋ณด๋‹ค๋Š” dag run ์ธ์Šคํ„ด์Šค, task ์ธ์Šคํ„ด์Šค๋ฅผ ๊ตฌ๋ถ„ํ•˜๋Š” ID๊ฐ€ ๋œ๋‹ค. (dag run ์ธ์Šคํ„ด์Šค์™€ task ์ธ์Šคํ„ด์Šค ๊ฐ„์˜ ๊ด€๊ณ„ ๊ธฐ์ค€์ด ๋˜๊ธฐ๋„ ํ•ฉ๋‹ˆ๋‹ค)
+ catchup
โ€ข
DAG๊ฐ€ ์ฒ˜์Œ ํ™œ์„ฑํ™”๋œ ์‹œ์ ์ด start_date๋ณด๋‹ค ๋ฏธ๋ž˜ ์‹œ์ ์ผ ๊ฒฝ์šฐ
โ€ข
start_date์™€ ์ฒ˜์Œ ์‹คํ–‰๋œ ์‹œ๊ฐ„ ์‚ฌ์ด, ๋ฐฐ์น˜๊ฐ€ ๋Œ์ง€ ์•Š์€ ์‹œ์ ๋“ค์— ๋Œ€ํ•ด์„œ ์–ด๋–ป๊ฒŒ ํ•  ๊ฒƒ์ธ์ง€ ๊ฒฐ์ •ํ•ด์ฃผ๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ
โ—ฆ
True: ์‹คํ–‰๋˜์ง€ ์•Š์€ ๊ฒƒ๋“ค ๋ชจ๋‘ ์‹คํ–‰
โ—ฆ
False: ์‹คํ–‰ํ•˜์ง€ ์•Š๊ณ  ๋ƒ…๋‘ 
โ€ข
metadata db์— ์‹คํ–‰ ์—ฌ๋ถ€ ๊ธฐ๋ก์ด ๋‚จ๊ธฐ ๋•Œ๋ฌธ์— docker compose down ํ–ˆ๋‹ค๊ฐ€ up๋˜์–ด๋„ ๋‹ค์‹œ ์ฝ์–ด์˜ค๋‹ˆ ์ฃผ์˜ํ•˜์—ฌ ํŒŒ๋ผ๋ฏธํ„ฐ ์‚ฌ์šฉ

Airflow ์Šค์ผ€์ฅด๋ง - New: data_interval

์ผ๋ถ€ ์‚ฌ๋žŒ๋“ค์ด ์•„๋‹Œ ๊ฑฐ์˜ ๋Œ€๋‹ค์ˆ˜์˜ ์‚ฌ๋žŒ๋“ค์ด execution_date๋ฅผ ๋งค์šฐ ํ—ท๊ฐˆ๋ คํ–ˆ๊ธฐ ๋•Œ๋ฌธ์—,,, 2021๋…„ 12์›” 2์ผ Airflow 2.2์—์„œ data_interval์ด๋ผ๋Š” ์ƒˆ๋กœ์šด ๊ฐœ๋…์ด ๋„์ž…๋ฉ๋‹ˆ๋‹ค.
๋ชจ๋‘๊ฐ€ too confusing์ด๋ผ๊ณ  ์™ธ์ณค์Œโ€ฆ
์ด์ œ๋Š” ์กฐ๊ธˆ ๋” ์ง๊ด€์ ์œผ๋กœ ์•„๋ž˜์™€ ๊ฐ™์ด ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค
โ€ข
์‹ค์ œ ์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ์ฝ์–ด์˜ค๋Š” ์‹œ์ž‘ ์‹œ์ ์„ data_interval_start
โ€ข
์—์–ดํ”Œ๋กœ์šฐ๊ฐ€ ์–ธ์ œ ์ฝ์–ด์˜ค๋Š” ๊ฒƒ์„ ๋งˆ๋ฌด๋ฆฌํ•ด์•ผ ํ•˜๋Š”์ง€์— ๋Œ€ํ•œ ์‹œ์ ์„ data_interval_end
๋˜ํ•œ ์‹ค์ œ ์ž‘์—…์ด ์‹คํ–‰๋˜๋Š” ์‹œ์ ์„ ํ‘œํ˜„ํ•˜๋Š”๋ฐ ์žˆ์–ด์„œ๋Š” logical_date๋ผ๊ณ  ํ‘œํ˜„ํ•˜๊ธฐ ์‹œ์ž‘ํ•˜์˜€์Šต๋‹ˆ๋‹ค. (execution์ด๋ผ๋Š” ๋‹จ์–ด ์ž์ฒด๊ฐ€ ํ—ท๊ฐˆ๋ฆฌ๊ฒŒ ํ•˜๋Š”๋ฐ ํฐ ๋น„์ค‘์„ ์ฐจ์ง€ํ•˜๊ธด ํ–ˆ์Œ,,,)
์ฆ‰, ์ž‘์—…์ดย ์‹คํ–‰๋˜๋Š” ์‹œ์ ์€ย logical_date๋กœ ํ‘œํ˜„ํ•˜๊ณ , ํ•ด๋‹น ์‹œ์ ์— ์‹คํ–‰๋˜๋Š”ย ์ž‘์—…์ด ์ฐธ๊ณ ํ•ด์•ผ ํ•  ๋ฐ์ดํ„ฐ์˜ ๋‚ ์งœ ๋ฒ”์œ„๋Š”ย data_interval_start์™€ย data_interval_end๋กœ ๊ฐ๊ฐ ๋‚˜๋ˆ„์–ด ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋˜์—ˆ์Šต๋‹ˆ๋‹ค
๊ธฐ์กด์— ์‚ฌ์šฉ๋˜๋˜ ๋งคํฌ๋กœ/๋ณ€์ˆ˜๋“ค๊ณผ ์ƒˆ๋กœ ๋ณ€๊ฒฝ๋˜๋Š” ๋ณ€์ˆ˜๋“ค์„ ํ‘œ๋กœ ์ •๋ฆฌํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค.
๊ธฐ์กด ์ด๋ฆ„
์ƒˆ๋กœ์šด ์ด๋ฆ„
execution_date
data_interval_start, logical_date
next_execution_date
data_interval_end
tomorrow_ds
deprecate
tomorrow_ds_nodash
deprecate
yesterday_ds
deprecate
yesterday_ds_nodash
deprecate
prev_execution_date
prev_logical_date
next_execution_date
next_logical_date
๊ทธ ์™ธ์—๋„ย execution_date๋Š” Airflow ์Šค์ผ€์ค„๋ง์˜ ํ•ต์‹ฌ ๊ฐœ๋…์ด๊ธฐ ๋•Œ๋ฌธ์—, Airflow Metadata DB์—์„œ๋„ ์ด๋ฏธ ํญ๋„“๊ฒŒ ์‚ฌ์šฉ๋˜๊ณ  ์žˆ์—ˆ์Šต๋‹ˆ๋‹ค. ์ด์— ๋”ฐ๋ผ ๋ณ€๊ฒฝ๋˜๋Š” DBํ…Œ์ด๋ธ” ๊ตฌ์กฐ์˜ ๋ณ€๊ฒฝ ๋“ฑ ๋” ์ž์„ธํ•œ ์‚ฌํ•ญ์„ ํ™•์ธํ•˜๋ ค๋ฉดย AIP-39 ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•ด ์ฃผ์„ธ์š”.

Airflow ์Šค์ผ€์ฅด๋ง - New: Timetable

์•ž์„œ ํ˜„์žฌย schedule_interval์€ ํฌ๋ก  ํ‘œํ˜„์‹(Cron expression)๊ณผย timedelta๋กœ๋งŒ ํ‘œํ˜„ํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์—ย '๊ณตํœด์ผ์„ ์ œ์™ธํ•œ ์˜์—…์ผ์—๋งŒ ์ž‘์—…'ย ๊ณผ ๊ฐ™์ด ์Šค์ผ€์ค„ ์ค‘๊ฐ„์ค‘๊ฐ„์— ๊ตฌ๋ฉ์ด ๋šซ๋ ค ์žˆ๋Š” ๋ณต์žกํ•œ ํ˜•ํƒœ์˜ ์Šค์ผ€์ค„์„ ์ •์˜ํ•  ์ˆ˜ ์—†์—ˆ์Šต๋‹ˆ๋‹ค.
ํ•˜์ง€๋งŒ Airflow 2.2๋ฅผ ๊ธฐ์ ์œผ๋กœย Timetable(https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html)์ด๋ผ๋Š” ์ƒˆ๋กœ์šด ํด๋ž˜์Šค๊ฐ€ ๋„์ž…๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ž์„ธํ•œ ์‚ฌ์šฉ ๋ฐฉ๋ฒ• ๊ด€๋ จํ•ด์„œ๋Š” ์•„๋ž˜ ์ž๋ฃŒ๋ฅผ ์ฐธ๊ณ ํ•ด์ฃผ์„ธ์š”