(1) 작업이 I/O 바운드 (네트워크, 파일, DB)? → stack 지원하면 asyncio, 아니면 threading. (2) 작업이 CPU 바운드? → multiprocessing. (3) 둘 다? → I/O 부분엔 asyncio, async 함수 안에서 process pool 로 CPU 작업 offload (asyncio.to_thread, loop.run_in_executor).
asyncio.to_thread — sync 를 async 로 다리
3.9+ 가 await asyncio.to_thread(sync_fn, *args) 추가. sync_fn 을 스레드에서 실행, 완료 await, 결과 반환. async 코드에서 blocking 라이브러리 호출 + 이벤트 루프 안 막기에 완벽. cwkPippa 가 가진 몇 sync-only 연산에 사용.
cwkPippa 예 — async-끝까지 모양
cwkPippa 가 위에서 아래까지 asyncio. 웹 layer (FastAPI) async. DB 접근 (aiosqlite) async. Claude/Codex/Gemini SDK 모두 async streaming 노출. JSONL 쓰기가 async 파일 I/O. 모든 chat 요청이 전체 stack 통해 단일 async coroutine 으로 — 스레드 풀 X, 프로세스 풀 X, 단지 수천 기다리는 I/O 사이에 dispatching 하는 이벤트 루프.
Free-threaded 미래 (3.13+, 실험)
Python 3.13 이 실험적 free-threaded 빌드 (GIL 없음) 추가. 안정 + 널리 지원되면 "CPU 바운드는 multiprocessing" 계산 변경 — 스레드가 실제 병렬화. 오늘 (2026 초) 옵트인, 아직 디폴트 X. 오늘 99% 코드엔 GIL 여전히 적용, multiprocessing-for-CPU 답이 맞음.
War Story: Pippa 의 Claude-Pippa 어댑터가 한 번 청크별 타임아웃 위해 __anext__ 둘레에 asyncio.wait_for 시도. 도구 호출 동안 타임아웃 발생, 취소가 async generator 종료, 이후 next 호출이 StopIteration raise. 해결책 — 청크별 X, 전체 stream 둘레의 타임아웃. async 취소가 미묘 — load-bearing 구조 코드처럼 다뤄.
Code
결정: I/O 작업 — asyncio 가 답·python
import asyncio
import aiohttp # async HTTP 클라이언트
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(fetch(session, u) for u in urls))
# (데모만 — asyncio.run(fetch_all(urls)) 로 실행)
# 수백 동시 fetch, 단일 스레드, 한 이벤트 루프
결정: CPU 작업 — multiprocessing 이 답·python
from concurrent.futures import ProcessPoolExecutor
def heavy_compute(n):
total = 0
for i in range(n):
total += i * i
return total
if __name__ == '__main__':
with ProcessPoolExecutor() as pool:
results = list(pool.map(heavy_compute, [10**7] * 4))
print(results)
# 4 코어 실제 병렬 사용, GIL 제약 X
혼합 — async + offload CPU 작업·python
import asyncio
def heavy_compute(n): # sync, CPU 바운드
total = 0
for i in range(n):
total += i * i
return total
async def main():
# 이벤트 루프 안 막게 스레드 (또는 프로세스) 로 offload
result = await asyncio.to_thread(heavy_compute, 10_000_000)
print(result)
asyncio.run(main())
# 프로세스 레벨엔 — loop.run_in_executor(ProcessPoolExecutor(), fn, *args)
threading 이 맞는 호출일 때·python
# threading 이 이기는 특정 케이스:
# - sync 라이브러리 사용해야 (async 버전 X)
# - 지금 async 로 리팩터 안 하는 중
# - 작업이 I/O 바운드
#
# 예 — sync 만인 legacy DB 드라이버.
# asyncio.to_thread 또는 ThreadPoolExecutor 로 감싸.
#
# 무턱대고 threading 디폴트 X — 사람들 가정보다 적은 경우에 답.
# asyncio 가 거의 모든 현대 I/O 라이브러리 지원,
# threading 은 갭 메우기.
각 시나리오에 어느 모델 맞는지 결정 + 한 문장 정당화 — (a) 100 웹 페이지 병렬 다운로드. (b) 다중 코어 머신에서 1000 이미지 리사이즈. (c) 수천 websocket 연결 처리하는 웹 서버. (d) async FastAPI 엔드포인트 안에서 sync DB 라이브러리 호출. (e) N=200 까지 피보나치 시퀀스 계산 (재귀, 캐시 X).
Progress
Progress is local-only — sign in to sync across devices.