Skip to main content

실시간 알림은 현대 웹 서비스에서 필수 기능이 되었습니다. 사용자가 새 메시지를 받거나, 결제가 완료되거나, 시스템 이벤트가 발생했을 때 즉시 알림을 받을 수 있어야 합니다. 이 글에서는 FastAPI WebSocket을 이용해 수천 명의 동시 접속을 처리하는 실시간 알림 시스템을 구축하는 방법을 단계별로 설명합니다.

WebSocket 실시간 알림 시스템

WebSocket vs HTTP Polling

전통적인 HTTP 폴링 방식은 클라이언트가 주기적으로 서버에 “새 알림 있어요?” 라고 요청합니다. 10초마다 폴링하면 1,000명 사용자 기준 분당 6,000번의 불필요한 요청이 발생합니다. 반면 WebSocket은 한 번 연결하면 서버에서 클라이언트로 데이터를 즉시 푸시할 수 있어 네트워크 비용이 획기적으로 줄어듭니다.

WebSocket의 주요 장점은 양방향 통신, 낮은 레이턴시, 헤더 오버헤드 최소화입니다. 특히 알림 시스템처럼 서버 클라이언트 방향의 데이터 흐름이 많은 경우 HTTP/2 Server-Sent Events(SSE)도 좋은 대안이지만, WebSocket은 클라이언트에서 서버 방향 통신도 동일한 커넥션으로 처리할 수 있어 더 유연합니다.

FastAPI WebSocket 기초 설정

FastAPI는 Starlette 기반으로 WebSocket을 기본 지원합니다. 별도 라이브러리 없이 websockets 패키지만 설치하면 됩니다.


from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio

app = FastAPI()

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_json({"type": "ack", "message": data})
    except WebSocketDisconnect:
        print(f"User {user_id} disconnected")

연결 풀 관리자(ConnectionManager) 구현

단순히 WebSocket 엔드포인트를 만드는 것으로는 부족합니다. 수천 명의 동시 접속을 안전하게 관리하려면 연결 풀을 관리하는 전용 클래스가 필요합니다. 특히 특정 사용자에게만 알림을 보내거나, 연결이 끊긴 클라이언트에 전송을 시도하지 않도록 처리해야 합니다.


from typing import Dict, List
from fastapi import WebSocket
import asyncio, logging

logger = logging.getLogger(__name__)

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[int, List[WebSocket]] = {}
        self._lock = asyncio.Lock()

    async def connect(self, user_id: int, websocket: WebSocket):
        await websocket.accept()
        async with self._lock:
            if user_id not in self.active_connections:
                self.active_connections[user_id] = []
            self.active_connections[user_id].append(websocket)

    async def disconnect(self, user_id: int, websocket: WebSocket):
        async with self._lock:
            if user_id in self.active_connections:
                self.active_connections[user_id].remove(websocket)
                if not self.active_connections[user_id]:
                    del self.active_connections[user_id]

    async def send_to_user(self, user_id: int, data: dict):
        if user_id not in self.active_connections:
            return False
        disconnected = []
        for ws in self.active_connections[user_id]:
            try:
                await ws.send_json(data)
            except Exception:
                disconnected.append(ws)
        for ws in disconnected:
            await self.disconnect(user_id, ws)
        return True

    async def broadcast(self, data: dict):
        all_users = list(self.active_connections.keys())
        tasks = [self.send_to_user(uid, data) for uid in all_users]
        await asyncio.gather(*tasks, return_exceptions=True)

manager = ConnectionManager()

JWT 인증으로 WebSocket 보안 처리

WebSocket 연결에도 인증이 필요합니다. HTTP 요청과 달리 WebSocket은 Authorization 헤더를 직접 보내기 어렵기 때문에 쿼리 파라미터로 토큰을 전달하는 방식을 주로 사용합니다.


from fastapi import WebSocket, WebSocketDisconnect, Query
from jose import JWTError, jwt
import os

SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"

async def get_user_from_token(token: str) -> int:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        if user_id is None:
            raise ValueError("Invalid token")
        return int(user_id)
    except JWTError:
        raise ValueError("Token verification failed")

@app.websocket("/ws/notifications")
async def notification_websocket(
    websocket: WebSocket,
    token: str = Query(...)
):
    try:
        user_id = await get_user_from_token(token)
    except ValueError:
        await websocket.close(code=4001, reason="Unauthorized")
        return

    await manager.connect(user_id, websocket)
    try:
        while True:
            msg = await asyncio.wait_for(
                websocket.receive_text(),
                timeout=60.0
            )
            if msg == "ping":
                await websocket.send_text("pong")
    except (WebSocketDisconnect, asyncio.TimeoutError):
        await manager.disconnect(user_id, websocket)

알림 이벤트 발행 – FastAPI Background Task 연동

실제 서비스에서는 결제 완료, 메시지 수신, 상태 변경 등의 이벤트가 발생할 때 해당 사용자에게 알림을 푸시해야 합니다. FastAPI의 Background Task를 통해 응답 지연 없이 알림을 전송할 수 있습니다.


from fastapi import BackgroundTasks
from pydantic import BaseModel
from datetime import datetime
from enum import Enum

class NotificationType(str, Enum):
    MESSAGE = "message"
    PAYMENT = "payment"
    SYSTEM = "system"

class NotificationPayload(BaseModel):
    type: NotificationType
    title: str
    body: str
    url: str | None = None
    created_at: str = datetime.now().isoformat()

async def push_notification(user_id: int, payload: NotificationPayload):
    sent = await manager.send_to_user(user_id, {
        "event": "notification",
        "data": payload.model_dump()
    })
    if sent:
        logger.info(f"Realtime notification sent to user {user_id}")
    else:
        logger.info(f"User {user_id} offline, saved to DB only")

@app.post("/api/v1/payments/{payment_id}/complete")
async def complete_payment(payment_id: int, background_tasks: BackgroundTasks):
    user_id = 123
    notification = NotificationPayload(
        type=NotificationType.PAYMENT,
        title="결제 완료",
        body=f"주문 #{payment_id} 결제가 완료되었습니다.",
        url=f"/orders/{payment_id}"
    )
    background_tasks.add_task(push_notification, user_id, notification)
    return {"status": "ok", "payment_id": payment_id}

SvelteKit 클라이언트 연동

프론트엔드에서는 WebSocket 연결을 관리하고 지수 백오프 방식으로 재연결 로직을 구현해야 합니다. 네트워크 장애 시 1초, 2초, 4초… 최대 30초 간격으로 자동 재연결합니다.


// src/lib/stores/notifications.ts
import { writable } from 'svelte/store';

export const notifications = writable([]);
export const wsConnected = writable(false);

let ws = null;
let reconnectTimer;
let reconnectDelay = 1000;

export function connectWebSocket(token) {
  const wsUrl = `${import.meta.env.VITE_WS_URL}/ws/notifications?token=${token}`;
  ws = new WebSocket(wsUrl);

  ws.onopen = () => {
    wsConnected.set(true);
    reconnectDelay = 1000;
    const heartbeat = setInterval(() => {
      if (ws?.readyState === WebSocket.OPEN) ws.send('ping');
      else clearInterval(heartbeat);
    }, 30000);
  };

  ws.onmessage = (event) => {
    const message = JSON.parse(event.data);
    if (message.event === 'notification') {
      notifications.update(list => [
        { id: crypto.randomUUID(), ...message.data, read: false },
        ...list
      ].slice(0, 50));
    }
  };

  ws.onclose = () => {
    wsConnected.set(false);
    reconnectTimer = setTimeout(() => {
      reconnectDelay = Math.min(reconnectDelay * 2, 30000);
      connectWebSocket(token);
    }, reconnectDelay);
  };
}

다중 서버 환경 – Redis Pub/Sub 연동

서버를 수평 확장할 경우 사용자가 서버 1에 연결되어 있는데 서버 2에서 발생한 이벤트를 전달해야 하는 상황이 생깁니다. Redis Pub/Sub를 사용하면 모든 서버가 동일한 채널을 구독하여 알림을 중계할 수 있습니다.


import redis.asyncio as aioredis
import json, asyncio

REDIS_URL = "redis://localhost:6379"

async def redis_subscriber(manager):
    redis = await aioredis.from_url(REDIS_URL)
    pubsub = redis.pubsub()
    await pubsub.subscribe("notifications")
    async for message in pubsub.listen():
        if message["type"] == "message":
            data = json.loads(message["data"])
            user_id = data.get("user_id")
            payload = data.get("payload")
            if user_id and payload:
                await manager.send_to_user(user_id, payload)

async def publish_notification(user_id: int, payload: dict):
    redis = await aioredis.from_url(REDIS_URL)
    await redis.publish("notifications", json.dumps({
        "user_id": user_id,
        "payload": payload
    }))
    await redis.aclose()

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(redis_subscriber(manager))

Nginx WebSocket 프록시 설정

Nginx로 WebSocket을 프록시할 때는 Upgrade와 Connection 헤더를 반드시 전달해야 하며, 타임아웃도 길게 설정해야 연결이 유지됩니다.


# /etc/nginx/sites-available/api.conf
server {
    listen 443 ssl;
    server_name api.yourdomain.com;

    location /ws/ {
        proxy_pass http://127.0.0.1:8000;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        proxy_buffering off;
    }

    location / {
        proxy_pass http://127.0.0.1:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }
}

정리 – 실시간 알림 시스템 체크리스트

  • ConnectionManager: asyncio.Lock으로 thread-safe 연결 풀 관리
  • JWT 인증: 쿼리 파라미터로 토큰 검증, 인증 실패 시 code 4001로 종료
  • Heartbeat: 30초 주기 ping/pong으로 좀비 연결 탐지
  • 지수 백오프 재연결: 네트워크 장애 시 클라이언트 자동 복구
  • Redis Pub/Sub: 다중 서버 환경에서 알림 동기화
  • Nginx 타임아웃: proxy_read_timeout 3600s 설정
  • DB 저장: 오프라인 사용자를 위한 미전송 알림 보관

코드벤터는 글로벌 협력 네트워크를 기반으로 FastAPI, SvelteKit, Redis 등 현대적인 기술 스택을 활용해 실시간 고성능 서비스를 설계하고 구축합니다. 이번 글에서 소개한 WebSocket 알림 시스템은 실제 프로덕션 환경에서 검증된 패턴을 바탕으로 작성되었습니다. 실시간 시스템 구축이나 서비스 아키텍처 설계에 대한 문의는 코드벤터로 연락 주세요.

코드픽 - 외주 전문 AI 바이브 코딩 글로벌 진출

댓글 남기기