C.W.K.
Stream
Lesson 01 of 06 · published

Extract — API, 파일, DB 에서 읽기

~13 min · etl, extract, api

Level 0구경꾼
0 XP0/47 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

첫 stage 가 가장 어려움

대부분 파이프라인 실패가 extract stage 에 살아 — 본인이 통제 안 하는 시스템과 코드가 닿는 곳이니까. API 가 throttle 하고, 화요일에 malformed JSON 반환하고, 알리지 않고 응답 schema 바꿔. CSV export 가 vendor 업데이트 후 포맷 바뀌고. DB 가 ORM 진화로 column type drift. Extract 의 규율은 source-of-truth byte 를 충실히 캡처하고, 받은 거 로깅하고, 타입 전쟁은 별도 stage 에서 검증이 싸우게 두는 거야.

두 패턴

  • Bulk full extract — 매 run 에 전체 source 끌어와. 단순, idempotent, state 없음. Source 가 작거나 거의 안 바뀔 때 OK.
  • Incremental extract — 마지막 run 이후 바뀐 것만, watermark 컬럼 (updated_at >= last_run) 또는 change feed 사용. 빠른데 watermark 관리해야 해.

비협상 항목

  • 지수 백오프 retry: transient 에러 (HTTP 429/5xx) 에.
  • 페이지네이션 정확히 처리 — 대부분 API 는 페이지당 ≤1000 row 반환; 마지막 페이지가 full page 미만 반환하거나 API 가 끝났다고 할 때까지 loop.
  • raw 응답 캡처, parsed 결과만이 아니라. API 가 화요일에 쓰레기 반환하면 디버깅용 raw JSON 원해.
  • User-Agent 설정 으로 본인 파이프라인 식별. Upstream 운영자가 트래픽 spike 이메일 보낼 때 본인 찾을 수 있어야 해.

Code

방어적 페이지네이션 + retry API extractor·python
import time
import logging
from typing import Iterator
import httpx

log = logging.getLogger('extract')

def fetch_orders(client: httpx.Client, since: str) -> Iterator[dict]:
    url = 'https://api.example.com/v1/orders'
    params = {'updated_since': since, 'page_size': 1000}
    page = 1
    while True:
        for attempt in range(5):
            try:
                resp = client.get(url, params={**params, 'page': page})
                if resp.status_code in (429, 500, 502, 503, 504):
                    raise httpx.HTTPStatusError('transient', request=resp.request, response=resp)
                resp.raise_for_status()
                break
            except httpx.HTTPStatusError as e:
                wait = 2 ** attempt
                log.warning('transient error %s, retry in %ds', e.response.status_code, wait)
                time.sleep(wait)
        else:
            raise RuntimeError(f'page {page} retry 다 소진')

        body = resp.json()
        for row in body['results']:
            yield row

        if not body.get('next_page'):
            return
        page += 1

with httpx.Client(
    headers={'User-Agent': 'cwk-de-pipeline/1.0 (ops@example.com)'},
    timeout=30.0,
) as client:
    rows = list(fetch_orders(client, since='2026-04-01T00:00:00Z'))

External links

Exercise

공개 REST API 골라 (GitHub, OpenWeather, free tier 주식 ticker). 페이지네이션 처리하고 transient 에러 retry 한 번 이상하고, parsing 전에 raw 응답을 disk 에 쓰는 extractor 작성. 그리고 raw 파일 읽어서 DataFrame 으로 parse 하는 별도 함수 작성. 분리가 양쪽을 독립 테스트 가능하게 만드는 거 알아채.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.