C.W.K.
Stream
Lesson 03 of 06 · published

Load — Warehouse 또는 downstream 파일에 쓰기

~12 min · etl, load, idempotency

Level 0구경꾼
0 XP0/47 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

Load stage 가 idempotency 가장 중요한 곳

Extract 와 transform 은 자유롭게 retry 가능 — 입력은 read-only 고 출력은 in-memory. Load 는 다른 사람이 읽는 destination 에 쓰는데, 즉 버그 있는 load 가 downstream 리포트, 대시보드, ML 학습 데이터를 corrupt 시킬 수 있어. 이 stage 잘못하면 결과가 본인 머신을 떠나.

네 가지 canonical write 패턴

  • Replace — 전체 target 덮어쓰기. 가장 안전한 의미; 작은 데이터셋만 viable.
  • Append — 새 row 추가. 가장 단순하지만 다시 돌리면 row 두 배. 다른 dedup 로직 없으면 파이프라인에서 금지.
  • Partition replace — 단일 partition (예: date=2026-04-30) atomic 교체. 시간 partitioned warehouse 의 default.
  • Upsert — primary key 로 merge. 새 거면 insert, 있으면 update. 가장 깔끔한 의미, destination 지원 필요 (Postgres ON CONFLICT, warehouse 의 MERGE).

Code

Atomic partition replace — staging 에 쓰고 swap·python
from pathlib import Path
import shutil
import pandas as pd

def load_partition(df: pd.DataFrame, root: Path, partition: str) -> Path:
    target = root / f'date={partition}'
    staging = root / f'.staging_{partition}'

    if staging.exists():
        shutil.rmtree(staging)
    staging.mkdir(parents=True)
    df.to_parquet(staging / 'part-0.parquet', index=False, compression='zstd')

    if target.exists():
        shutil.rmtree(target)
    staging.rename(target)
    return target

load_partition(df, Path('warehouse/orders'), '2026-04-30')
Postgres ON CONFLICT 로 upsert·python
import psycopg

def upsert_orders(rows: list[dict]) -> None:
    sql = '''
        INSERT INTO orders (order_id, customer_id, amount_usd, order_date, updated_at)
        VALUES (%(order_id)s, %(customer_id)s, %(amount_usd)s, %(order_date)s, NOW())
        ON CONFLICT (order_id) DO UPDATE
        SET customer_id = EXCLUDED.customer_id,
            amount_usd  = EXCLUDED.amount_usd,
            order_date  = EXCLUDED.order_date,
            updated_at  = NOW()
    '''
    with psycopg.connect('postgresql://...') as conn:
        with conn.cursor() as cur:
            cur.executemany(sql, rows)

External links

Exercise

warehouse/<table>/date=<partition>/part-0.parquet 에 staging 디렉토리 통해 쓰는 partition-replace loader 구현. 같은 partition 에 두 번 돌려 — 두 번째 run 이 동일 출력을 만들고 disk 에 추가물 안 남기게.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.