C.W.K.
Stream
Lesson 06 of 06 · published

Observability — 로그, metric, run log

~11 min · observability, logging, monitoring

Level 0구경꾼
0 XP0/47 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

안 보이는 건 못 고쳐

"내 파이프라인 돌아" 에서 "내 파이프라인 observable" 로의 전환은 스크립트에서 시스템으로의 전환과 같아. Observability 는 세 layer 있고 각 layer 가 다른 사고 클래스 해결.

  • 로그 — 뭐 일어났는지, 순서대로, 사람 읽을 수 있는 형태. 디버깅용.
  • Metric — 모든 stage 에서 emit 된 count, duration, rate. 트렌드와 알림용.
  • Run log — 시작 시간, 종료 시간, 성공/실패, stage 별 in/out row count, 에러의 per-run 기록. Forensic 과 SLA 추적용.

구조화 로그는 비협상

일반 텍스트 로그 ('started extract') 는 터미널 보는 사람한테 OK. Production grade 로그는 구조화 (key=value 또는 JSON), log aggregator 가 indexing, 패턴 알림, run 간 query 가능하게. Python logging 모듈이 두 줄 config + 중요한 값 포함 습관으로 거기 도달.

Code

scaling 되는 구조화 로깅 설정·python
import json
import logging
from datetime import datetime

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            'ts': datetime.utcnow().isoformat() + 'Z',
            'level': record.levelname,
            'name': record.name,
            'msg': record.getMessage(),
        }
        # logger.info(..., extra={'kv': {...}}) 로 넘긴 추가물
        if hasattr(record, 'kv'):
            payload.update(record.kv)
        return json.dumps(payload, ensure_ascii=False)

handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logging.basicConfig(level=logging.INFO, handlers=[handler])
log = logging.getLogger('orders_pipeline')

log.info('extract complete', extra={'kv': {
    'stage': 'extract', 'rows': 12_345, 'ms': 1834,
}})
Run-log 레코드 — 대시보드가 query 하는 산출물·python
import time
from dataclasses import dataclass, field

@dataclass
class RunLog:
    pipeline: str
    window: str
    started_at: float = field(default_factory=time.time)
    stages: dict = field(default_factory=dict)
    success: bool = False
    error: str | None = None
    finished_at: float | None = None

    def stage(self, name: str, **kv) -> None:
        self.stages[name] = {'ms': int(kv.pop('ms', 0)), **kv}

    def to_row(self) -> dict:
        return {
            'pipeline':    self.pipeline,
            'window':      self.window,
            'started_at':  self.started_at,
            'finished_at': self.finished_at,
            'duration_s':  (self.finished_at or time.time()) - self.started_at,
            'success':     self.success,
            'error':       self.error,
            **{f'stage_{k}': v for k, v in self.stages.items()},
        }

External links

Exercise

본인 파이프라인 중 하나에 구조화 JSON 로깅과 per-run run-log 레코드 추가. 다음 run 후 run-log row 를 Parquet 또는 DuckDB 테이블에 작성. 시간 따른 duration plot. 그 plot 이 "이 파이프라인 느려지고 있어?" 의 답 — observability 없으면 답 못 하는 질문.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.