Airflow 핵심 개념: DAG·Task·Operator·Sensor·XCom 정리
Airflow 5대 핵심 개념(DAG, Task, Operator, Sensor, XCom) 정리
아래에서는 각 개념(DAG, Task, Operator, Sensor, XCom)을 정의 → 왜 필요한가 → 예시 → 주의사항/베스트 프랙티스 순서로 정리한다.
1) DAG (Directed Acyclic Graph)
정의: DAG는 Airflow에서 워크플로(파이프라인)의 설계도이다. 작업(Task) 간의 의존 관계를 방향성이 있는 비순환 그래프로 표현한다. 여기에 스케줄, 시작일(start_date), 기본 파라미터(default_args) 같은 실행 규칙이 담긴다.
왜 필요한가: 파이프라인의 전체 실행 순서, 주기, 재시도 정책을 한 곳에서 선언적으로 관리할 수 있다. 개발자가 개별 스크립트 실행 타이밍을 직접 맞출 필요가 없다.
간단 코드:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime, timedelta
from airflow import DAG
default_args = {
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
dag_id="example_daily_pipeline",
start_date=datetime(2025, 9, 10),
schedule_interval="0 3 * * *", # 매일 03:00 KST
catchup=False,
default_args=default_args,
tags=["daily"],
)
주의/베스트 프랙티스
start_date
는 과거 시점으로 두고catchup=False
로 시작해야 과도한 과거 실행(백필)을 방지할 수 있다.- DAG는 설계도일 뿐, 실제 작업 로직은 Task/Operator가 수행한다.
2) Task
정의: Task는 DAG 안에서 실행되는 작업 단위(노드)이다. 데이터 추출, 변환, 적재 같은 원자적 스텝을 의미한다.
왜 필요한가: 각 단계를 명확히 분리하면 실패/재시도, 로그, 병목 파악이 쉽다. 또한 병렬화가 가능해 전체 실행 시간을 줄일 수 있다.
간단 코드 (의존성 지정 예):
1
2
3
4
5
6
7
from airflow.operators.empty import EmptyOperator
start = EmptyOperator(task_id="start", dag=dag)
clean = EmptyOperator(task_id="clean_and_standardize", dag=dag)
load = EmptyOperator(task_id="load_to_rds", dag=dag)
start >> clean >> load # 실행 순서: start → clean → load
주의/베스트 프랙티스
- Task는 단일 책임만 담당하도록 설계해야 한다.
- 실패해도 재실행 가능하도록 멱등성을 고려해야 한다.
3) Operator
정의: Operator는 “어떤 일을 어떻게 할지” 정의한 Task의 템플릿(클래스)이다. Task는 Operator의 인스턴스이다.
왜 필요한가: 파일 이동, SQL 실행, HTTP 호출, 파이썬 함수 실행 등 표준화된 동작을 재사용 가능한 클래스로 제공한다.
주요 종류
- Action Operators:
PythonOperator
,BashOperator
,EmailOperator
등 - Transfer Operators: 시스템 간 데이터 이동 (예: S3 ↔️ DB)
- Branch Operators: 조건 분기 실행 (
BranchPythonOperator
) - Deferrable Operators: 비동기/이벤트 기반 대기
간단 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
def clean_func(**ctx):
# pandas로 정제/표준화 수행
return {"clean_key": "s3://bucket/clean/dt=20250911/part-000.csv"}
clean_task = PythonOperator(
task_id="clean_and_standardize",
python_callable=clean_func,
dag=dag,
)
upsert_task = PostgresOperator(
task_id="upsert_to_rds",
postgres_conn_id="example_rds",
sql="""
INSERT INTO shop_clean (...) VALUES (...)
ON CONFLICT (biz_id) DO UPDATE SET ...;
""",
dag=dag,
)
주의/베스트 프랙티스
- 대용량 데이터는 XCom에 넣지 말고 S3 경로나 매니페스트만 주고받아야 한다.
- 연결 정보는 Connections/Secrets로 관리해야 한다.
4) Sensor
정의: Sensor는 특정 조건이 충족될 때까지 대기하는 Operator의 한 종류이다. 파일 존재, 파티션 생성, 외부 작업 완료 등 이벤트 기반 상태를 확인한다.
왜 필요한가: 데이터 준비 전에 변환을 시작하는 문제를 방지한다. 데이터 파이프라인의 정합성을 확보하는 핵심 요소이다.
주요 모드
- poke: 주기적 폴링. 간단하지만 워커 점유
- reschedule: 폴링 사이에 워커 반납 → 리소스 절약
- deferrable: 이벤트 루프 기반 대기 → 더 효율적
간단 코드
1
2
3
4
5
6
7
8
9
10
11
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_raw = S3KeySensor(
task_id="wait_raw_csv",
bucket_key="raw/dt=/data.csv",
bucket_name="example-bucket",
poke_interval=60,
timeout=60*60,
mode="reschedule",
dag=dag,
)
주의/베스트 프랙티스
- 가능하면
mode="reschedule"
또는 deferrable 센서를 사용해야 한다. timeout
과 실패 시 후속 처리 방안을 명확히 설정해야 한다.
5) XCom (Cross-Communication)
정의: XCom은 Task 간에 소량의 메타데이터를 교환하는 저장소이다. PythonOperator
의 리턴값은 자동으로 XCom에 저장된다.
왜 필요한가: 정제 결과 파일 경로, 행 개수, 분기 기준 값 같은 작은 정보를 다음 Task로 넘길 수 있다.
TaskFlow API 예시
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval="0 3 * * *", start_date=datetime(2025, 9, 10), catchup=False, tags=["example"])
def example_taskflow():
@task()
def clean_and_standardize():
return {"clean_key": "s3://bucket/clean/dt=/part-000.csv"}
@task()
def load_to_rds(payload: dict):
clean_key = payload["clean_key"]
# 해당 경로 사용해 COPY/INSERT 실행
load_to_rds(clean_and_standardize())
example = example_taskflow()
클래식 방식 (pull/push)
1
2
# push: return 값 자동 XCom 저장
# pull: ti.xcom_pull(key="return_value", task_ids="clean_and_standardize")
주의/베스트 프랙티스
- XCom에는 수 KB~수십 KB의 작은 값만 넣어야 한다. 데이터 본문은 S3, DB 등 외부 스토리지에 저장한다.
- 키 충돌을 피하려면 명시적 키를 사용하거나 TaskFlow API로 타입 안정성을 확보한다.
- 불필요한 XCom은 청소해야 한다.
한눈에 보는 관계도
flowchart LR
A[DAG 파이프라인 설계도] --> B[Task 작업 단위]
B --> C1[Operator 행동 템플릿]
B --> C2[Sensor 조건 대기]
B -->|XCom: 메타데이터 전달| B2[다음 Task]
자주 하는 오해
- DAG = 스케줄만? → ❌ DAG는 스케줄 + 기본 파라미터 + 의존 구조를 포함한 설계도이다.
- Task = 함수 호출? → ❌ Airflow가 관리하는 실행 단위이다.
- XCom으로 데이터까지? → ❌ XCom은 작은 메타데이터만 주고받는다.
- Sensor는 비효율적이다? → ◯/❌ 설계 방식에 따라 다르다.
reschedule
/deferrable 센서를 사용하면 효율적으로 운용 가능하다. - Operator vs Task? → Operator는 클래스(템플릿), Task는 그 인스턴스(실행 노드)이다.
체크리스트
- DAG는
catchup=False
로 시작했는가? - Task는 단일 책임 원칙을 지켰는가?
- Sensor는
mode="reschedule"
또는 deferrable인가?timeout
이 설정되었는가? - XCom에는 S3 키, 카운트, 플래그만 담고 본문은 외부 저장소에 있는가?
- Connections/Secrets로 자격증명을 안전하게 관리하는가?