C.W.K.
Stream
Lesson 02 of 05 · published

Server-Sent Events — One-Way Streaming, 이미 네 브라우저에

~11 min · streaming-async, sse, event-stream, ai-streaming

Level 0HTTP Newbie
0 XP0/46 lessons0/12 achievements
0/120 XP to next level120 XP to go0% complete
"SSE 가 모든 AI 채팅 앱이 token stream 에 쓰는 protocol. HTTP/1.1 위 텍스트. EventSource 가 브라우저 JS 5 줄로 처리. 한 가지 — server 가 client 한테 event push — 잘 함."

SSE 가 뭔지

Server-Sent Events 가 one-way HTTP streaming 위한 W3C 명세: server 가 장기 HTTP/1.1 (혹은 2 나 3) response 위로 client 한테 이름 붙은 event 시퀀스 push. 연결이 client 가 시작 (정상 GET) 하고 열림 유지; server 가 event 일어나면 씀.

왜 존재: server 가 push 해야 하는데 client 는 push 안 해도 되는 application class (채팅 token, real-time dashboard, log tailing, 알림) 있음. WebSocket 이 이거 가능한데 protocol-upgrade 복잡도 추가. SSE 가 그냥 특별 content type 과 body format 가진 장기 HTTP response.

Wire format — 추측보다 단순

Content-Type: text/event-stream. Body 가 single newline 으로 분리된 필드 type 셋 가진 UTF-8 텍스트, event 는 blank line 으로 분리:

event: message
data: {"role":"assistant","content":"Hi "}
id: 42

event: message
data: {"role":"assistant","content":"아빠."}
id: 43

event: done
data: {"final_id":"m_abc"}

필드:

  • event: — event 이름. Optional; 기본 "message".
  • data: — payload (보통 JSON). Multi-line data 는 data: 줄 여러 개, \n 으로 연결.
  • id: — 재연결 위한 event ID. 브라우저가 마지막 본 ID 기억하고 재연결 시 Last-Event-ID header 로 보냄.
  • retry: — 연결 손실 시 client 가 재시도 전 ms 수.
  • blank line — 한 event 와 다음 event 분리.

브라우저 쪽 — EventSource 가 5 줄

현대 브라우저가 모든 거 처리하는 native EventSource API ship:

const es = new EventSource('/api/chat/stream');
es.addEventListener('message', (e) => render(JSON.parse(e.data)));
es.addEventListener('done', (e) => es.close());
es.onerror = () => { /* es.close() 안 하면 auto-재연결 */ };

브라우저가 재연결, Last-Event-ID, parsing, event dispatch 처리. 5 줄 쓰고 streaming UI 있음.

Server 쪽 — Async streaming response

Server-쪽 SSE 가 그냥 event 일어나면 쓰는 장기 HTTP response. FastAPI 에서:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json, asyncio

app = FastAPI()

@app.get('/api/chat/stream')
async def chat_stream():
    async def event_stream():
        for i, token in enumerate(['Hi', ' ', '아빠', '.']):
            yield f'event: message\ndata: {json.dumps({"content": token})}\nid: {i}\n\n'
            await asyncio.sleep(0.1)
        yield 'event: done\ndata: {}\n\n'
    return StreamingResponse(event_stream(), media_type='text/event-stream')

Body 가 텍스트 yield 하는 async generator. Starlette/FastAPI 가 각 yield 를 즉시 wire 에 flush. Client 가 각 event 도착 시 봄.

SSE 가 HTTP 가 이미 하는 거 하는데 단지 천천히. 새 protocol 없음, 새 연결 upgrade 없음, async-capable 런타임 외 특별 server 요구 없음. '스트리밍' 이 그냥 ms 대신 분 걸리는 HTTP response. 모든 reverse proxy, 모든 CDN, 모든 HTTP 라이브러리가 아래 transport 이해.

Gotcha

1. Response buffering 비활성. Nginx, Apache, 다른 reverse proxy 가 기본 response buffer — event 가 proxy 에 누적되어 response 가 완전 쓰여질 때까지. X-Accel-Buffering: no (Nginx) 혹은 등가 추가해 proxy 한테 즉시 통과 알려.

2. Chunked encoding 신경. SSE response 가 Transfer-Encoding: chunked 씀 — body 길이 사전 모름. Content-Length 설정 안 함.

3. Heartbeat 가 timeout 예방. Intermediary 가 idle 장기 연결 닫을 수 있음. 15-30s 마다 comment 줄 (: heartbeat\n\n) 보내서 연결 따뜻 유지. 브라우저가 comment 무시.

4. CORS 여전히 적용. Cross-origin SSE 가 server 가 Access-Control-Allow-Origin 보내야. EventSource API 가 다른 fetch 처럼 CORS 존중.

cwkPippa 의 SSE 현실

cwkPippa 의 POST /api/chat 가 이 codebase 의 canonical SSE producer — 각 provider 에서 Claude/Codex/Gemini/Ollama token 이 도착하면 stream. Server-쪽 패턴이 backend/adapters/claude.pystream method 에 살아, Starlette 의 StreamingResponse 통해 event yield. Frontend 의 useChat hook (frontend/src/hooks/useChat.ts) 가 custom SSE parser 통해 소비 (EventSource 아님 — POST semantics 원함, EventSource 가 POST 미지원 — POST + 수동 EventStream parsing). JSONL ground-truth log 가 wire 로 가기 전 모든 event 캡처, 그래서 연결 끊긴 client 가 재연결 시 log 에서 대화 재 render 가능.

Code

SSE wire format — header, comment heartbeat, 이름 붙은 event·http
# Wire 위 raw SSE response
GET /api/chat/stream HTTP/1.1
Accept: text/event-stream

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Transfer-Encoding: chunked
X-Accel-Buffering: no

: heartbeat

event: message
data: {"content":"Hi "}
id: 1

event: message
data: {"content":"아빠."}
id: 2

event: done
data: {"final_id":"m_abc"}

# (server 가 끝나면 연결 닫음; 혹은 client 가 충분히 봤으면 닫음)
FastAPI SSE producer — async generator + StreamingResponse·python
# FastAPI server — backpressure 가진 streaming SSE
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json, asyncio

app = FastAPI()

async def stream_tokens():
    """Upstream LLM 에서 token 도착 시 SSE-formatted event yield."""
    for i, token in enumerate(['Hi', ' ', '아빠', '.']):
        # SSE event: 'event:' 줄 + 'data:' 줄 + 'id:' 줄 + blank 구분
        yield f'event: message\ndata: {json.dumps({"content": token})}\nid: {i}\n\n'
        await asyncio.sleep(0.1)
    # Client 가 깔끔히 닫게 terminal event
    yield 'event: done\ndata: {}\n\n'

@app.get('/api/chat/stream')
async def chat_stream():
    return StreamingResponse(
        stream_tokens(),
        media_type='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'X-Accel-Buffering': 'no',  # Nginx buffering 비활성
        },
    )
브라우저: EventSource (5 줄) 혹은 POST 위해 fetch + 수동 SSE parser·javascript
// 브라우저 client — native EventSource 로 5 줄
const es = new EventSource('/api/chat/stream');
es.addEventListener('message', (e) => {
  const { content } = JSON.parse(e.data);
  appendToUi(content);
});
es.addEventListener('done', () => es.close());
es.onerror = () => console.warn('SSE 끊김; 브라우저 자동 재연결');

// POST semantics 필요하면 (request 와 body 보냄 — 채팅에 필수),
// EventSource 가 POST 미지원. 대신 streaming reader 가진 fetch:
const resp = await fetch('/api/chat/stream', {
  method: 'POST',
  headers: {'Content-Type': 'application/json'},
  body: JSON.stringify({message: 'hi pippa'}),
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buf = '';
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buf += decoder.decode(value, { stream: true });
  for (const event of parseSSEFromBuffer(buf)) handle(event);
  buf = remainder(buf);
}

External links

Exercise

30초 동안 초 당 한 번 현재 시간 stream 하고 'end' event 발신하는 FastAPI SSE endpoint GET /clock 만들어. 브라우저 EventSource 에서 소비. 그 다음 중간에 60초 침묵 일부러 추가하고 연결 끊김 (Nginx 나 런타임 timeout) 봐. 15s 마다 heartbeat comment 추가하고 연결 생존 봐. 보너스: buffering 비활성 안 하고 reverse proxy 통해 endpoint 일부러 서빙하고 event 가 어떻게 누적되고 한 번에 flush 되는지 관찰.
Hint
EventSource 가 연결 끊기면 auto-재연결 — 그래서 60s 침묵이 연결 떨궈도 브라우저가 몇 초 후 새 SSE 세션 시작. Heartbeat fix 가 15초 마다 yield 하는 : \n\n (글자 그대로 colon-space-newline-newline). Buffering 보너스가 가장 교육적 — buffering 꺼지면 event stream 하는 거 SEE, 켜지면 뭉쳐서 도착.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.