C.W.K.
Stream
Lesson 02 of 06 · published

FastAPI와 long-running worker에서 AsyncAnthropic

~16 min · async, fastapi, concurrency

Level 0Observer
0 XP0/64 lessons0/13 achievements
0/150 XP to next level150 XP to go0% complete

요청당 X, 프로세스당 클라이언트 하나

Async 클라이언트는 connection pool 유지. 매 FastAPI 라우트에서 새 AsyncAnthropic() 만들면 connection setup 오버헤드 태우고 pool 효과 막아. 시작 시 한 번 빌드, app state나 모듈 레벨 싱글톤에 두고, 어디서든 재사용.

asyncio.gather로 동시 호출

한 요청이 여러 Claude 호출 필요할 때(병렬 요약, multi-shot 평가, fan-out), asyncio.gather가 동시 실행. SDK async 클라이언트가 connection 재사용·rate-limit backoff을 gather 아래서 깔끔히 처리 — fan-out에 페널티 없어.

Cancellation·timeout 제대로

클라이언트의 upstream HTTP 요청이 canceled되면(브라우저 닫힘, FastAPI CancelledError raise), in-flight Anthropic 호출도 cancel되길 원하지. SDK가 asyncio cancellation 존중. 전체 호출 timeout엔 async with asyncio.timeout(); stream iteration을 asyncio.wait_for로 감싸지 (generator 죽임 — Messages 트랙 참고).

원칙: Async 클라이언트는 오래 살고 공유되길 원해. 클라이언트를 DB connection pool처럼 다뤄, request-scoped 객체 X.

Code

공유 async 클라이언트 가진 FastAPI lifespan·python
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from anthropic import AsyncAnthropic

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.claude = AsyncAnthropic()
    yield
    # AsyncAnthropic이 내부에서 httpx 사용; connection pool 우아하게 닫기.
    await app.state.claude.close()

app = FastAPI(lifespan=lifespan)

@app.post("/summarize")
async def summarize(request: Request, body: dict):
    client: AsyncAnthropic = request.app.state.claude
    resp = await client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{"role": "user", "content": body["text"]}],
    )
    return {"summary": resp.content[0].text}
asyncio.gather로 동시 fan-out·python
import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def classify(text: str) -> str:
    r = await client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=32,
        system='Reply with one word: positive, negative, or neutral.',
        messages=[{"role": "user", "content": text}],
    )
    return r.content[0].text.strip()

async def main():
    async with asyncio.timeout(30):
        labels = await asyncio.gather(
            classify("I love this"),
            classify("meh"),
            classify("this is awful"),
        )
    print(labels)

asyncio.run(main())

External links

Exercise

매 요청마다 새 AsyncAnthropic 만드는 FastAPI 라우트를 lifespan 동안 공유하게 리팩터. 50 sequential 요청에 대해 time-to-first-byte를 before/after 측정.
Hint
30-50ms 떨어진 거 봤으면 그게 손해 본 connection-pool 재사용분.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.