Post

Airflow 핵심 개념: DAG·Task·Operator·Sensor·XCom 정리

Airflow 5대 핵심 개념(DAG, Task, Operator, Sensor, XCom) 정리

Airflow 핵심 개념: DAG·Task·Operator·Sensor·XCom 정리

아래에서는 각 개념(DAG, Task, Operator, Sensor, XCom)을 정의 → 왜 필요한가 → 예시 → 주의사항/베스트 프랙티스 순서로 정리한다.


1) DAG (Directed Acyclic Graph)

정의: DAG는 Airflow에서 워크플로(파이프라인)의 설계도이다. 작업(Task) 간의 의존 관계를 방향성이 있는 비순환 그래프로 표현한다. 여기에 스케줄, 시작일(start_date), 기본 파라미터(default_args) 같은 실행 규칙이 담긴다.

왜 필요한가: 파이프라인의 전체 실행 순서, 주기, 재시도 정책을 한 곳에서 선언적으로 관리할 수 있다. 개발자가 개별 스크립트 실행 타이밍을 직접 맞출 필요가 없다.

간단 코드:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime, timedelta
from airflow import DAG

default_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    dag_id="example_daily_pipeline",
    start_date=datetime(2025, 9, 10),
    schedule_interval="0 3 * * *",  # 매일 03:00 KST
    catchup=False,
    default_args=default_args,
    tags=["daily"],
)

주의/베스트 프랙티스

  • start_date는 과거 시점으로 두고 catchup=False로 시작해야 과도한 과거 실행(백필)을 방지할 수 있다.
  • DAG는 설계도일 뿐, 실제 작업 로직은 Task/Operator가 수행한다.

2) Task

정의: Task는 DAG 안에서 실행되는 작업 단위(노드)이다. 데이터 추출, 변환, 적재 같은 원자적 스텝을 의미한다.

왜 필요한가: 각 단계를 명확히 분리하면 실패/재시도, 로그, 병목 파악이 쉽다. 또한 병렬화가 가능해 전체 실행 시간을 줄일 수 있다.

간단 코드 (의존성 지정 예):

1
2
3
4
5
6
7
from airflow.operators.empty import EmptyOperator

start = EmptyOperator(task_id="start", dag=dag)
clean = EmptyOperator(task_id="clean_and_standardize", dag=dag)
load = EmptyOperator(task_id="load_to_rds", dag=dag)

start >> clean >> load  # 실행 순서: start → clean → load

주의/베스트 프랙티스

  • Task는 단일 책임만 담당하도록 설계해야 한다.
  • 실패해도 재실행 가능하도록 멱등성을 고려해야 한다.

3) Operator

정의: Operator는 “어떤 일을 어떻게 할지” 정의한 Task의 템플릿(클래스)이다. Task는 Operator의 인스턴스이다.

왜 필요한가: 파일 이동, SQL 실행, HTTP 호출, 파이썬 함수 실행 등 표준화된 동작을 재사용 가능한 클래스로 제공한다.

주요 종류

  • Action Operators: PythonOperator, BashOperator, EmailOperator
  • Transfer Operators: 시스템 간 데이터 이동 (예: S3 ↔️ DB)
  • Branch Operators: 조건 분기 실행 (BranchPythonOperator)
  • Deferrable Operators: 비동기/이벤트 기반 대기

간단 코드

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator


def clean_func(**ctx):
    # pandas로 정제/표준화 수행
    return {"clean_key": "s3://bucket/clean/dt=20250911/part-000.csv"}

clean_task = PythonOperator(
    task_id="clean_and_standardize",
    python_callable=clean_func,
    dag=dag,
)

upsert_task = PostgresOperator(
    task_id="upsert_to_rds",
    postgres_conn_id="example_rds",
    sql="""
        INSERT INTO shop_clean (...) VALUES (...)
        ON CONFLICT (biz_id) DO UPDATE SET ...;
    """,
    dag=dag,
)

주의/베스트 프랙티스

  • 대용량 데이터는 XCom에 넣지 말고 S3 경로나 매니페스트만 주고받아야 한다.
  • 연결 정보는 Connections/Secrets로 관리해야 한다.

4) Sensor

정의: Sensor는 특정 조건이 충족될 때까지 대기하는 Operator의 한 종류이다. 파일 존재, 파티션 생성, 외부 작업 완료 등 이벤트 기반 상태를 확인한다.

왜 필요한가: 데이터 준비 전에 변환을 시작하는 문제를 방지한다. 데이터 파이프라인의 정합성을 확보하는 핵심 요소이다.

주요 모드

  • poke: 주기적 폴링. 간단하지만 워커 점유
  • reschedule: 폴링 사이에 워커 반납 → 리소스 절약
  • deferrable: 이벤트 루프 기반 대기 → 더 효율적

간단 코드

1
2
3
4
5
6
7
8
9
10
11
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_raw = S3KeySensor(
    task_id="wait_raw_csv",
    bucket_key="raw/dt=/data.csv",
    bucket_name="example-bucket",
    poke_interval=60,
    timeout=60*60,
    mode="reschedule",
    dag=dag,
)

주의/베스트 프랙티스

  • 가능하면 mode="reschedule" 또는 deferrable 센서를 사용해야 한다.
  • timeout과 실패 시 후속 처리 방안을 명확히 설정해야 한다.

5) XCom (Cross-Communication)

정의: XCom은 Task 간에 소량의 메타데이터를 교환하는 저장소이다. PythonOperator의 리턴값은 자동으로 XCom에 저장된다.

왜 필요한가: 정제 결과 파일 경로, 행 개수, 분기 기준 값 같은 작은 정보를 다음 Task로 넘길 수 있다.

TaskFlow API 예시

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval="0 3 * * *", start_date=datetime(2025, 9, 10), catchup=False, tags=["example"])
def example_taskflow():
    @task()
    def clean_and_standardize():
        return {"clean_key": "s3://bucket/clean/dt=/part-000.csv"}

    @task()
    def load_to_rds(payload: dict):
        clean_key = payload["clean_key"]
        # 해당 경로 사용해 COPY/INSERT 실행

    load_to_rds(clean_and_standardize())

example = example_taskflow()

클래식 방식 (pull/push)

1
2
# push: return 값 자동 XCom 저장
# pull: ti.xcom_pull(key="return_value", task_ids="clean_and_standardize")

주의/베스트 프랙티스

  • XCom에는 수 KB~수십 KB의 작은 값만 넣어야 한다. 데이터 본문은 S3, DB 등 외부 스토리지에 저장한다.
  • 키 충돌을 피하려면 명시적 키를 사용하거나 TaskFlow API로 타입 안정성을 확보한다.
  • 불필요한 XCom은 청소해야 한다.

한눈에 보는 관계도

flowchart LR
    A[DAG 파이프라인 설계도] --> B[Task 작업 단위]
    B --> C1[Operator 행동 템플릿]
    B --> C2[Sensor 조건 대기]
    B -->|XCom: 메타데이터 전달| B2[다음 Task]

자주 하는 오해

  • DAG = 스케줄만? → ❌ DAG는 스케줄 + 기본 파라미터 + 의존 구조를 포함한 설계도이다.
  • Task = 함수 호출? → ❌ Airflow가 관리하는 실행 단위이다.
  • XCom으로 데이터까지? → ❌ XCom은 작은 메타데이터만 주고받는다.
  • Sensor는 비효율적이다? → ◯/❌ 설계 방식에 따라 다르다. reschedule/deferrable 센서를 사용하면 효율적으로 운용 가능하다.
  • Operator vs Task? → Operator는 클래스(템플릿), Task는 그 인스턴스(실행 노드)이다.

체크리스트

  • DAG는 catchup=False로 시작했는가?
  • Task는 단일 책임 원칙을 지켰는가?
  • Sensor는 mode="reschedule" 또는 deferrable인가? timeout이 설정되었는가?
  • XCom에는 S3 키, 카운트, 플래그만 담고 본문은 외부 저장소에 있는가?
  • Connections/Secrets로 자격증명을 안전하게 관리하는가?
This post is licensed under CC BY 4.0 by the author.