C.W.K.
Stream
Lesson 02 of 04 · published

httpx로 Python streaming

~22 min · streaming, python, httpx

Level 0Downloader
0 XP0/41 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

패턴

제대로 된 Python Ollama streaming client가 챙기는 세 가지:

  1. timeout=None — Cold model의 첫 토큰 latency가 수십 초 갈 수 있어. Default httpx timeout이 warm-up 중간에 stream 죽임. None으로 설정하고 server에 맡겨.
  2. iter_lines() — HTTP client가 newline까지 buffer하다가 완성된 줄 yield. Raw byte 읽어서 직접 줄 재조립하지 마.
  3. flush=True — 토큰 출력할 때. 안 하면 stdio buffer가 streaming 느낌 삼켜.

Production에서 어디 쓰여

이게 자체 backend의 streaming endpoint 빌딩 블록. 피파의 Ollama vessel이 정확히 이 패턴 — Ollama에서 NDJSON stream, 각 chunk를 universal StreamChunk로 변환, JSONL ground truth에 write, frontend로 yield.

흔한 실수

  • timeout=None 잊기. Default timeout이 cold start에서 죽임.
  • 줄 단위가 아니라 char 단위 buffering. iter_text()는 network 경계에서 chunk되고, iter_lines()가 경계 문제 처리해줘.
  • 빈 마지막 chunk 처리 안 함. done: true가 빈 content와 함께 옴. Print 전에 break.

Code

Production-grade streamer·python
import httpx
import json
from typing import Iterator

def stream_chat(model: str, messages: list[dict],
                base_url: str = "http://localhost:11434",
                **options) -> Iterator[dict]:
    """NDJSON chunk를 /api/chat에서 stream. 파싱된 dict yield."""
    payload = {"model": model, "messages": messages, "stream": True}
    if options:
        payload["options"] = options

    with httpx.stream("POST", f"{base_url}/api/chat",
                      json=payload, timeout=None) as r:
        r.raise_for_status()
        for line in r.iter_lines():
            if not line:
                continue
            yield json.loads(line)

# 사용 — 토큰 도착하는 대로 print, metric 캡처
full = ""
for chunk in stream_chat("qwen2.5:7b",
                          [{"role": "user", "content": "Explain MLX in 3 sentences."}]):
    if chunk.get("done"):
        eval_count = chunk.get("eval_count", 0)
        eval_dur_ns = chunk.get("eval_duration", 1)
        tps = eval_count / (eval_dur_ns / 1e9)
        print(f"\n[{eval_count} tokens @ {tps:.1f} tok/s]")
        break
    delta = chunk.get("message", {}).get("content", "")
    full += delta
    print(delta, end="", flush=True)
httpx.AsyncClient로 async 버전·python
import httpx, json
from typing import AsyncIterator

async def stream_chat_async(model: str, messages: list[dict]) -> AsyncIterator[dict]:
    """Async streaming — FastAPI / asyncio server에 선호."""
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream(
            "POST",
            "http://localhost:11434/api/chat",
            json={"model": model, "messages": messages, "stream": True},
        ) as r:
            r.raise_for_status()
            async for line in r.aiter_lines():
                if not line:
                    continue
                yield json.loads(line)

# Async context에서 사용
async def main():
    async for chunk in stream_chat_async(
        "qwen2.5:7b",
        [{"role": "user", "content": "What is GGUF?"}],
    ):
        if chunk.get("done"):
            break
        print(chunk["message"]["content"], end="", flush=True)

External links

Exercise

stream_chat(model, messages) 동기 generator랑 async 버전 둘 다 구현. 같은 prompt를 양쪽으로 stream하고 마지막 chunk에서 tok/s 출력. 숫자 거의 같아야 — 다르면 timeout 설정 확인.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.