C.W.K.
Stream
Lesson 04 of 04 · published

NDJSON → SSE proxy

~20 min · streaming, sse, proxy

Level 0Downloader
0 XP0/41 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

왜 이 proxy 만들어?

대부분 cloud-shape chat UI랑 SDK (OpenAI Python SDK, Vercel AI SDK, LangChain streaming hook)가 SSE 소비. Frontend 다시 안 짜고 Ollama drop-in하려면 NDJSON → SSE 변환하는 작은 proxy 만들어.

변환의 모양

  • Ollama emit {json}\n; SSE는 data: {json}\n\n 원함.
  • OpenAI-compatible SSE client는 끝에 data: [DONE]\n\n sentinel 기대.
  • data: 안의 chunk shape는 보통 OpenAI delta shape으로 reformat 필요: {"choices": [{"delta": {"content": "..."}}]}.

OpenAI 호환성 reinvent하지 마

Ollama 이미 /v1/chat/completions를 OpenAI-compatible endpoint로 노출해. "drop-in OpenAI SDK"만 필요하면 SDK base_url을 http://localhost:11434/v1로 가리키면 proxy skip 가능. Ollama compat endpoint가 안 하는 변환 (커스텀 auth, observability, request shaping) 필요할 때만 자체 proxy 만들어.

Code

FastAPI proxy: NDJSON → OpenAI delta shape SSE·python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import httpx, json

app = FastAPI()

async def ollama_to_openai_sse(model: str, messages: list[dict]):
    """Ollama NDJSON을 OpenAI-shape SSE로 bridge."""
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream(
            "POST",
            "http://localhost:11434/api/chat",
            json={"model": model, "messages": messages, "stream": True},
        ) as r:
            async for line in r.aiter_lines():
                if not line:
                    continue
                chunk = json.loads(line)
                content = chunk.get("message", {}).get("content", "")
                done = chunk.get("done", False)

                payload = {
                    "id": "ollama-stream",
                    "object": "chat.completion.chunk",
                    "model": model,
                    "choices": [{
                        "index": 0,
                        "delta": {"content": content} if content else {},
                        "finish_reason": "stop" if done else None,
                    }],
                }
                yield f"data: {json.dumps(payload)}\n\n"
                if done:
                    yield "data: [DONE]\n\n"
                    return

@app.post("/v1/chat/completions")
async def chat_completions(req: dict):
    return StreamingResponse(
        ollama_to_openai_sse(req["model"], req["messages"]),
        media_type="text/event-stream",
    )
아니면 Ollama 내장 OpenAI compat 그냥 써·bash
# Drop-in OpenAI 호환만 원하면 proxy 안 필요
# OpenAI client를 이렇게 가리키기:
#   base_url = http://localhost:11434/v1
#   api_key = "ollama"   (빈 값 아닌 아무거나)

# OpenAI Python SDK
python3 - <<'PY'
from openai import OpenAI
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
stream = client.chat.completions.create(
    model="qwen2.5:7b",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
)
for chunk in stream:
    print(chunk.choices[0].delta.content or "", end="", flush=True)
PY

External links

Exercise

FastAPI proxy 만들어서 OpenAI Python SDK를 자체 proxy URL에 가리켜서 test. 그다음 같은 SDK를 http://localhost:11434/v1 직접 가리키기. 동작 어디서 갈리는지 메모 (거의 같아야 — 발견한 차이 list).

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.