C.W.K.
Stream
Lesson 03 of 05 · published

Dagster — Asset-First Orchestration

~12 min · dagster, assets, orchestration

Level 0구경꾼
0 XP0/47 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

Asset 으로 생각하면 뭐 바뀌나

Dagster (2026.4 기준 1.10+) 는 "무슨 task 돌리지?" 대신 "무슨 데이터 asset 만들지?" 묻는 orchestrator. Asset 은 함수가 materialize 하는 논리적 데이터 객체 — 테이블, Parquet 디렉토리, ML 모델, 리포트. DAG 는 asset 사이 의존성 그래프에서 자동 emerge.

작은 전환처럼 들리지. 실전에선 대화 자체가 바뀜. "orders 가 stale 인 이유는 upstream customers asset 이 화요일 이후 refresh 안 돼서" 가 Dagster UI 가 말할 수 있는 문장; Airflow 에선 task status 보고 데이터한테 뭔 의미인지 추론.

Dagster 프로젝트의 모양

  • Asset@asset decorate 된 Python 함수. Materialize 된 객체 반환 (또는 IO manager 통해 작성).
  • IO Manager — asset 이 어떻게 읽히고 쓰여지는지의 plug 가능 전략 (S3 의 Parquet, Postgres, in-memory 등).
  • Resource — asset 에 inject 되는 재사용 가능 config (DB 연결, API 클라이언트).
  • Sensor / schedule — asset 이 언제 re-materialize 될지.

Code

Asset 셋으로 표현된 Dagster 파이프라인·python
import pandas as pd
from dagster import asset, AssetExecutionContext

@asset(group_name='orders')
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    df = pd.read_csv('source/orders.csv', dtype=str)
    context.add_output_metadata({'rows': len(df)})
    return df

@asset(group_name='orders')
def clean_orders(context: AssetExecutionContext, raw_orders: pd.DataFrame) -> pd.DataFrame:
    df = (raw_orders
        .assign(order_date=lambda d: pd.to_datetime(d['order_date']),
                amount_usd=lambda d: pd.to_numeric(d['amount_usd']))
        .dropna(subset=['order_id', 'amount_usd']))
    context.add_output_metadata({'rows': len(df), 'dropped': len(raw_orders) - len(df)})
    return df

@asset(group_name='orders')
def monthly_revenue(clean_orders: pd.DataFrame) -> pd.DataFrame:
    return (clean_orders
        .assign(month=lambda d: d['order_date'].dt.to_period('M'))
        .groupby('month', as_index=False)['amount_usd'].sum()
        .rename(columns={'amount_usd': 'revenue'}))

External links

Exercise

새 venv 에서 pip install dagster dagster-webserver pandas. 위 3-asset 예시를 파일에 복사. dagster dev -f <file>.py. localhost:3000 에서 UI 열고 "Materialize all" 클릭, lineage 그래프 렌더 보기. 데이터 의존성이 orchestration 인 거 알아채.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.