C.W.K.
Stream
Lesson 07 of 07 · published

Redis Pub/Sub Bridge

~14 min · management, scaling, redis, pubsub

Level 0Poller
0 XP0/60 lessons0/10 achievements
0/120 XP to next level120 XP to go0% complete

Single-server 천장

FastAPI 프로세스 한 대가 수만 WebSocket connection 들 수 있어, 백만은 못 들어. horizontally scale 하려면 — load balancer 뒤 여러 서버 — 구조적 문제 직면: 같은 room 의 connection 이 다른 서버에 살 수 있어. 서버 A 의 broadcast 가 message bus 공유 안 하면 서버 B 의 user 한테 못 도달.

Redis Pub/Sub 이 표준 답

각 서버가 outgoing message 를 Redis channel 에 publish 하고 같은 channel 에 subscribe. Redis 가 모든 subscriber 한테 fan out. 각 서버가 그 후 local connection 한테만 broadcast. 패턴이 수천 서버 + 수백만 connection 까지 작동, central state 없음.

cwkPippa 의 비슷한 패턴

cwkPippa 는 의도적으로 single-server (집의 Mac Studio). 근데 피파의 Council 패턴 — user 의 한 질문이 여러 brain process 로 fan out 후 응답이 converge — 이 같은 모양. transport 가 다를 뿐 (subprocess pipe, Redis 아님), 'fan out, fan in' 은 분산 real-time 에 universal.

Code

Redis 를 통한 manager·python
import json
import redis.asyncio as aioredis
from fastapi import FastAPI, WebSocket
from typing import Dict, Set
import asyncio

class RedisManager:
    def __init__(self, redis_url: str):
        self.redis = aioredis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
        self.local: Dict[str, Set[WebSocket]] = {}
        self._task: asyncio.Task | None = None

    async def start(self):
        await self.pubsub.psubscribe('ws:room:*')
        self._task = asyncio.create_task(self._listen())

    async def stop(self):
        if self._task:
            self._task.cancel()
        await self.pubsub.aclose()
        await self.redis.aclose()

    async def join(self, ws: WebSocket, room: str):
        await ws.accept()
        self.local.setdefault(room, set()).add(ws)

    def leave(self, ws: WebSocket, room: str):
        members = self.local.get(room)
        if members:
            members.discard(ws)
            if not members:
                self.local.pop(room, None)

    async def publish(self, room: str, message: dict):
        '''All servers (including this one) will fan it out locally.'''
        await self.redis.publish(f'ws:room:{room}', json.dumps(message))

    async def _listen(self):
        async for msg in self.pubsub.listen():
            if msg.get('type') != 'pmessage':
                continue
            channel = msg['channel'].decode()
            room = channel.split(':', 2)[2]
            data = json.loads(msg['data'])
            for ws in list(self.local.get(room, ())):
                try:
                    await ws.send_json(data)
                except Exception:
                    self.leave(ws, room)
Topology·text
  Server A              Redis              Server B
  (A, B in #general)   pub/sub            (C, D in #general)
        |                |                       |
  user A sends "hi"      |                       |
        |--- publish --->|                       |
        |                |--- pmessage --------->|
        |                |                       v
        |                |                C and D get "hi"
        v                |
  A and B get "hi"
  (via local fan-out)

External links

Exercise

FastAPI 서버 두 대 다른 포트 (8001, 8002) 에 둘 다 한 Redis 에 연결. 각각에 클라 열어. 8001 클라에서 message 보내면 8002 클라가 Redis bridge 통해 받아야 함. Redis 멈추고 cross-server fan-out 멈추는지 확인; restart 후 재개되는지 확인.

Progress

Progress is local-only — sign in to sync across devices.
이 페이지에서 버그를 발견하셨거나 피드백이 있으세요?문제 신고

댓글 0

🔔 답글 알림 (로그인 필요)
로그인댓글을 남기려면 로그인해 주세요.

아직 댓글이 없어요. 첫 댓글을 남겨보세요.