C.W.K.
Stream
Lesson 02 of 07 · published

Tool Loop Orchestration — 두 layer, 두 책임

~22 min · orchestration, tool-loop

Level 0Tokenizer
0 XP0/54 lessons0/10 achievements
0/120 XP to next level120 XP to go0% complete

Adapter 는 wire 알아 — provider 별 메시지 format, event parse, tool result shape. Orchestrator 는 loop 알아 — handler 호출 시점, JSONL log 시점, break 시점, hook apply 시점.

Adapter 안에 loop 박는 게 가장 흔한 실수

'Adapter 가 다 처리!' 가 simpler 처럼 느껴짐 — 하지만 per-call instrumentation 불가. Single tool call 에 logging hook 추가 X. Handler 실행 전 arguments 가로채기 X. Wire 와 독립적으로 loop replay-test X.

Orchestrator 패턴

Adapter 가 typed event yield (TextDelta, ToolCallRequested, ...). Orchestrator 가 ToolCallRequested 받으면 → handler 호출 → 결과를 다음 호출의 input 으로 → hook 박기 (on_tool_call, on_text_delta, on_done) 가능.

Replay-based test 의 base

Orchestrator 가 loop 를 own 하면 captured event sequence 를 CI 에서 replay 가능. JSONL → mock adapter → orchestrator 동작 검증. Wire-level diff 없이 logic regression 잡음.

Code

Adapter 가 아니라 Orchestrator 가 loop own·python
import asyncio, json, logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class AgentConfig:
    model: str = "gpt-4.1"
    max_iterations: int = 10        # Safety limit on tool loops
    tool_timeout_seconds: float = 30.0

class ProductionAgent:
    def __init__(self, adapter, tools, tool_schemas, config=None):
        self.adapter = adapter
        self.tools = tools            # name → callable
        self.tool_schemas = tool_schemas
        self.config = config or AgentConfig()

    async def run(self, user_message, conversation_history=None):
        messages = conversation_history or [
            {"role": "system", "content": "You are a helpful assistant."}
        ]
        messages.append({"role": "user", "content": user_message})

        for iteration in range(self.config.max_iterations):
            text, tool_calls = "", []
            async for chunk in self.adapter.stream(
                messages=messages, tools=self.tool_schemas
            ):
                if isinstance(chunk, TextDelta): text += chunk.content
                elif isinstance(chunk, ToolCallComplete):
                    tool_calls.append(chunk)

            if not tool_calls:
                messages.append({"role": "assistant", "content": text})
                return text, messages

            # Execute tools in parallel with per-tool timeout
            results = await asyncio.gather(*[
                self._execute_tool(tc) for tc in tool_calls
            ])
            # Add results to messages...

        return "Max iterations reached.", messages

    async def _execute_tool(self, tc):
        try:
            fn = self.tools[tc.name]
            args = json.loads(tc.arguments)
            return await asyncio.wait_for(fn(**args), timeout=self.config.tool_timeout_seconds)
        except asyncio.TimeoutError:
            return {"error": f"Tool {tc.name} timed out"}
        except Exception as e:
            return {"error": str(e)}

External links

Exercise

Tool loop 를 refactor — Orchestrator (Adapter 아님) 가 handler 호출. Hook on_tool_call(name, args) 추가, 실행 전 JSONL 에 log. 다른 동작 변화 0 verify.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.