C.W.K.
Stream
Lesson 02 of 05 · published

OllamaAdapter 구현

~26 min · adapters, ollama

Level 0Downloader
0 XP0/41 lessons0/11 achievements
0/120 XP to next level120 XP to go0% complete

이 adapter가 하는 일

Ollama NDJSON streaming을 universal StreamChunk로 번역. Message content, tool call, timing 가진 final done frame 처리. Health check (/api/version에서 HTTP 200) 구현, model listing (/api/tags) 구현.

번역 map

Ollama 필드StreamChunk 필드
message.contentcontent
message.tool_callstool_calls (그대로 통과; 이미 parsed dict)
done: truedone = True + tokens_per_sec
(없음)thinking (emit하는 reasoning 모델만 사용)

에러 어디서 처리?

Adapter의 stream() 안에서. Connection refused → OllamaUnavailable (orchestrator가 잡아서 fallback trigger). Read timeout → ModelLoadingTimeout. Malformed JSON 줄 → OllamaProtocolError. 각각 의미 있게 로깅할 만한 context 가짐.

Code

Full OllamaAdapter·python
import httpx, json
from typing import AsyncIterator, Any

class OllamaUnavailable(Exception): ...
class OllamaProtocolError(Exception): ...

class OllamaAdapter(AIAdapter):
    def __init__(self, base_url: str = "http://localhost:11434",
                 model: str = "qwen2.5:7b"):
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.client = httpx.AsyncClient(timeout=None)

    async def stream(
        self,
        messages: list[dict],
        tools: list[dict] | None = None,
        **opts: Any,
    ) -> AsyncIterator[StreamChunk]:
        payload: dict = {"model": self.model, "messages": messages, "stream": True}
        if tools:
            payload["tools"] = tools
        if opts:
            payload["options"] = opts

        try:
            async with self.client.stream("POST", f"{self.base_url}/api/chat",
                                           json=payload) as r:
                r.raise_for_status()
                async for line in r.aiter_lines():
                    if not line:
                        continue
                    try:
                        chunk = json.loads(line)
                    except json.JSONDecodeError as e:
                        raise OllamaProtocolError(f"bad NDJSON: {e}") from e

                    msg = chunk.get("message", {})

                    if chunk.get("done"):
                        ec = chunk.get("eval_count", 0)
                        ed = chunk.get("eval_duration", 1) or 1
                        tps = ec / (ed / 1e9) if ed else 0.0
                        yield StreamChunk(done=True, tokens_per_sec=tps,
                                          model=self.model, raw=chunk)
                        return

                    if msg.get("tool_calls"):
                        yield StreamChunk(tool_calls=msg["tool_calls"], raw=chunk)
                        continue

                    yield StreamChunk(content=msg.get("content", ""), raw=chunk)
        except httpx.ConnectError as e:
            raise OllamaUnavailable("Ollama daemon not reachable") from e

    async def health_check(self) -> bool:
        try:
            r = await self.client.get(f"{self.base_url}/api/version", timeout=2.0)
            return r.status_code == 200
        except Exception:
            return False

    async def list_models(self) -> list[str]:
        r = await self.client.get(f"{self.base_url}/api/tags")
        r.raise_for_status()
        return [m["name"] for m in r.json().get("models", [])]

External links

Exercise

자체 프로젝트에 세 method 가진 OllamaAdapter 구현. 단순 chat로 stream() end-to-end test. Tool 정의 보내서 tool_calls 번역 test. Daemon 멈춘 상태에서 health_check() test — False 빨리 (3초 안에) 반환해야.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.