C.W.K.
Stream
Lesson 04 of 07 · published

httpx Streaming — async iter_lines + SSE parse

~22 min · httpx-stream, sse-parse

Level 0Tokenizer
0 XP0/54 lessons0/10 achievements
0/120 XP to next level120 XP to go0% complete

Async streaming 패턴 — async with client.stream("POST", url, headers=h, json=body) as response:. Block 안에서 async for line in response.aiter_lines(): 가 SSE frame 을 yield. Framing (data: prefix, empty-line separator, [DONE] terminator) 은 dev 가 parse.

가장 큰 실수

Streaming block 안에서 response.aread() 또는 response.json() 호출. 둘 다 전체 body 수집 — streaming 무효. iterate 하거나 streaming 안 한다고 인정하거나.

SSE parse 단계

  1. Line stripping
  2. 'data: ' prefix 분리
  3. '[DONE]' 감지 → break
  4. JSON parse
  5. Event type 으로 dispatch

iter_lines vs iter_bytes

aiter_lines 는 줄 단위, aiter_bytes 는 raw byte chunk. SSE 는 줄 기반이라 aiter_lines 가 적합. iter_bytes 는 binary streaming (e.g. audio download) 용.

Code

client.stream('POST', url, json=...)·python
import os, json, httpx, asyncio
from typing import AsyncIterator

async def stream_chat_async(
    messages: list[dict], model: str = "gpt-4o-mini",
) -> AsyncIterator[str]:
    """Yield text delta strings from a streaming response."""
    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
        "Content-Type": "application/json",
    }
    body = {"model": model, "messages": messages, "stream": True}
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("POST", url, headers=headers, json=body) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line or line == "data: [DONE]":
                    continue
                if line.startswith("data: "):
                    try:
                        chunk = json.loads(line[len("data: "):])
                        content = chunk["choices"][0]["delta"].get("content", "")
                        if content:
                            yield content
                    except (json.JSONDecodeError, KeyError, IndexError):
                        pass

async def main():
    async for token in stream_chat_async([{"role": "user", "content": "Count to 5."}]):
        print(token, end="", flush=True)
    print()

asyncio.run(main())

External links

Exercise

Raw httpx 로 Responses 호출 stream. SSE frame 직접 parse. (1) response.output_text.delta event 수, (2) 조립된 텍스트, (3) 본 non-text event 모두 print.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.