C.W.K.
Stream
Lesson 01 of 07 · published

ConnectionManager 클래스

~13 min · management, manager, metadata

Level 0Poller
0 XP0/60 lessons0/10 achievements
0/120 XP to next level120 XP to go0% complete

왜 dict 가 아니라 클래스

Track 3 의 minimal manager 는 set 의 dict. 실전 application 은 세 차원 더 필요: connection 별 metadata (언제 connect, 어느 IP 에서, 어느 session?), user 별 lookup (지금 user X 의 connection 은?), observability (현재 connection 수, room 별, user 별, 버전별?). 클래스가 이걸 한 자리에 모아 — 모듈 여러 곳에 흩뿌리지 말고.

세 인덱스

거의 항상 세 lookup 방향 필요: room → connection set, user_id → connection, connection → metadata. 작은 dict 셋; 모든 작업이 셋 다 일관되게 update. 셋 중 하나 빠뜨리면 production 의 'stale connection' 버그 다 거기서 나와.

Code

Production-shape manager·python
from fastapi import WebSocket
from typing import Dict, Set, Optional
import time

class ConnectionManager:
    def __init__(self):
        self.rooms: Dict[str, Set[WebSocket]] = {}
        self.user_map: Dict[str, WebSocket] = {}
        self.ws_meta: Dict[WebSocket, dict] = {}

    async def connect(self, ws: WebSocket, user_id: str, room: str):
        await ws.accept()
        self.rooms.setdefault(room, set()).add(ws)
        self.user_map[user_id] = ws
        self.ws_meta[ws] = {
            'user_id': user_id,
            'room': room,
            'connected_at': time.time(),
            'ip': ws.client.host if ws.client else None,
        }

    def disconnect(self, ws: WebSocket):
        meta = self.ws_meta.pop(ws, None)
        if meta is None:
            return
        room = meta['room']
        members = self.rooms.get(room)
        if members:
            members.discard(ws)
            if not members:
                self.rooms.pop(room, None)
        existing = self.user_map.get(meta['user_id'])
        if existing is ws:
            self.user_map.pop(meta['user_id'], None)

    async def send_to_user(self, user_id: str, message: dict) -> bool:
        ws = self.user_map.get(user_id)
        if ws is None:
            return False
        try:
            await ws.send_json(message)
            return True
        except Exception:
            self.disconnect(ws)
            return False

    async def broadcast(self, room: str, message: dict, *, exclude: Optional[WebSocket] = None):
        dead = []
        for ws in list(self.rooms.get(room, ())):
            if ws is exclude:
                continue
            try:
                await ws.send_json(message)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.disconnect(ws)

    @property
    def total_connections(self) -> int:
        return len(self.ws_meta)

    def room_users(self, room: str) -> list[str]:
        return [
            self.ws_meta[ws]['user_id']
            for ws in self.rooms.get(room, ())
            if ws in self.ws_meta
        ]

External links

Exercise

/admin/connections HTTP endpoint 추가해서 {rooms, total, users_per_room} 리턴. 테스트 클라 셋 connect 한 채로 방문; 숫자 옳은지 확인. 한 클라 force-quit 하고 새로고침 — 카운트가 옛 값에 안 멈추고 몇 초 안에 update 되어야 함.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.