Skip to main content
Python asyncio 완전 정복 — FastAPI 비동기 패턴

Python의 asyncio는 단순한 라이브러리가 아닙니다. I/O 바운드 작업이 많은 웹 서비스에서 수천 개의 동시 요청을 효율적으로 처리하기 위한 핵심 패러다임입니다. FastAPI가 빠른 이유도 결국 asyncio 위에 서 있기 때문입니다. 이 글에서는 asyncio의 핵심 개념부터 FastAPI에서 실제로 자주 마주치는 비동기 패턴과 함정까지, 실전 코드와 함께 정리합니다.

1. asyncio의 핵심 — 이벤트 루프와 코루틴

asyncio의 핵심은 단일 스레드에서 여러 작업을 번갈아 실행하는 이벤트 루프입니다. CPU가 I/O를 기다리는 시간을 낭비하지 않고 다른 작업을 처리합니다. 코루틴(coroutine)은 async def로 선언하고, await로 실행을 일시 중단합니다.

# 기본 코루틴과 이벤트 루프
import asyncio

async def fetch_data(name: str, delay: float) -> str:
    print(f"{name} 시작")
    await asyncio.sleep(delay)  # I/O 대기 시뮬레이션
    print(f"{name} 완료")
    return f"{name} 결과"

async def main():
    # 순차 실행 — 총 3초
    # result1 = await fetch_data("작업A", 1)
    # result2 = await fetch_data("작업B", 2)

    # 병렬 실행 — 총 2초 (가장 긴 작업 기준)
    results = await asyncio.gather(
        fetch_data("작업A", 1),
        fetch_data("작업B", 2),
        fetch_data("작업C", 1.5),
    )
    print(results)

asyncio.run(main())
# 출력:
# 작업A 시작
# 작업B 시작
# 작업C 시작
# 작업A 완료  (1초 후)
# 작업C 완료  (1.5초 후)
# 작업B 완료  (2초 후)

asyncio.gather()는 여러 코루틴을 동시에 스케줄링합니다. 순차 실행이라면 1+2+1.5=4.5초가 걸릴 작업이 2초 안에 끝납니다. FastAPI가 높은 동시성을 보여주는 원리가 바로 이것입니다.

2. FastAPI에서 async/await — 언제 써야 하나

FastAPI는 async def와 일반 def 엔드포인트를 모두 지원합니다. 하지만 어느 것을 써야 하는지에 대한 오해가 많습니다.

핵심 원칙: I/O 바운드 작업(DB, HTTP 요청, 파일)은 async def. CPU 바운드 작업(이미지 처리, 암호화, 복잡한 계산)은 일반 def 또는 별도 프로세스풀.

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import httpx

app = FastAPI()

# 비동기 DB 엔진 설정
engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/dbname",
    pool_size=20,
    max_overflow=0,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# ✅ 올바른 예 — DB 조회는 async def
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()

# ✅ 올바른 예 — 외부 API 호출은 async + httpx
@app.get("/external-data")
async def get_external():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data", timeout=10)
        return response.json()

# ❌ 함정 — async def 안에서 동기 블로킹 함수 호출
@app.get("/bad-example")
async def bad_example():
    import time
    time.sleep(2)  # 이벤트 루프 전체를 2초 동안 블록!
    return {"error": "이러면 안 됩니다"}

3. 동시성 함정과 해결법

asyncio를 쓴다고 무조건 빨라지는 게 아닙니다. 가장 흔한 실수는 async def 안에서 블로킹 코드를 실행하는 것입니다. 이 경우 이벤트 루프 자체가 멈추고 모든 요청이 대기 상태가 됩니다.

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from fastapi import FastAPI

app = FastAPI()
thread_pool = ThreadPoolExecutor(max_workers=10)
process_pool = ProcessPoolExecutor(max_workers=4)

# ✅ 블로킹 I/O → run_in_executor로 스레드풀 위임
@app.get("/blocking-io")
async def blocking_io_endpoint():
    loop = asyncio.get_event_loop()
    # requests 같은 동기 라이브러리를 써야 할 때
    result = await loop.run_in_executor(
        thread_pool,
        lambda: some_blocking_io_function()
    )
    return {"result": result}

# ✅ CPU 집약 작업 → ProcessPoolExecutor
@app.get("/cpu-intensive")
async def cpu_intensive_endpoint():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        process_pool,
        heavy_computation,  # PIL 이미지 처리, 암호화 등
        some_argument
    )
    return {"result": result}

# ✅ asyncio.TaskGroup (Python 3.11+) — 에러 처리가 더 안전
@app.get("/parallel-requests")
async def parallel_requests():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch_user_data())
        task2 = tg.create_task(fetch_order_data())
        task3 = tg.create_task(fetch_notification_data())
    # 하나라도 실패하면 ExceptionGroup 발생 → 나머지 자동 취소
    return {
        "user": task1.result(),
        "orders": task2.result(),
        "notifications": task3.result(),
    }

4. 비동기 컨텍스트와 생명주기 관리

FastAPI의 lifespan 이벤트와 비동기 의존성 주입을 활용하면 DB 연결 풀, HTTP 클라이언트, Redis 커넥터 등을 효율적으로 관리할 수 있습니다.

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import httpx
import redis.asyncio as aioredis

# 전역 리소스
http_client: httpx.AsyncClient = None
redis_client: aioredis.Redis = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 시작 시 초기화
    global http_client, redis_client
    http_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=100))
    redis_client = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
    print("리소스 초기화 완료")
    
    yield  # 애플리케이션 실행
    
    # 종료 시 정리
    await http_client.aclose()
    await redis_client.close()
    print("리소스 정리 완료")

app = FastAPI(lifespan=lifespan)

# 비동기 의존성 — DB 세션
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# 캐시 레이어와 DB 조회 조합
@app.get("/products/{product_id}")
async def get_product(product_id: int, db: AsyncSession = Depends(get_db)):
    # Redis 캐시 확인
    cache_key = f"product:{product_id}"
    cached = await redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # DB 조회
    result = await db.execute(select(Product).where(Product.id == product_id))
    product = result.scalar_one_or_none()
    
    if product:
        # 캐시 저장 (5분)
        await redis_client.setex(cache_key, 300, json.dumps(product.to_dict()))
    
    return product

5. asyncio.Semaphore로 동시성 제한

외부 API에 너무 많은 요청을 동시에 보내면 rate limit에 걸립니다. asyncio.Semaphore로 동시 실행 수를 제어할 수 있습니다.

import asyncio
import httpx

# 동시 5개까지만 허용
semaphore = asyncio.Semaphore(5)

async def fetch_with_limit(client: httpx.AsyncClient, url: str) -> dict:
    async with semaphore:  # 5개 초과 시 여기서 대기
        response = await client.get(url)
        return response.json()

async def batch_fetch(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_limit(client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 실패한 요청 필터링
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]
    
    if failed:
        print(f"실패: {len(failed)}건")
    
    return successful

# FastAPI 엔드포인트에서 활용
@app.post("/batch-process")
async def batch_process(items: list[str]):
    results = await batch_fetch(items)
    return {"processed": len(results), "results": results}

6. 실전 성능 팁 — asyncio 디버깅

asyncio 코드의 성능 문제를 진단할 때는 PYTHONASYNCIODEBUG=1 환경변수와 asyncio.set_debug(True)를 활용하세요. 느린 코루틴(기본값 100ms 이상)을 자동으로 경고합니다.

import asyncio
import logging
import time

# 디버그 모드 활성화
asyncio.set_debug(True)
logging.basicConfig(level=logging.DEBUG)

# 실행 시간 측정 데코레이터
def async_timer(func):
    async def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = await func(*args, **kwargs)
        elapsed = (time.perf_counter() - start) * 1000
        print(f"[TIMER] {func.__name__}: {elapsed:.2f}ms")
        return result
    return wrapper

@async_timer
async def slow_db_query():
    await asyncio.sleep(0.5)  # 500ms 쿼리 시뮬레이션
    return {"data": "result"}

# uvicorn 실행 시 reload=True면 이벤트 루프가 재생성됨에 주의
# production: uvicorn main:app --workers 4 --loop uvloop
# uvloop 설치: pip install uvloop
# uvloop은 asyncio보다 2-4배 빠른 C 기반 이벤트 루프

asyncio 핵심 정리

  • async def는 I/O 바운드 작업에, 블로킹 코드는 run_in_executor
  • asyncio.gather()로 독립적인 작업을 병렬화, 에러 처리는 return_exceptions=True
  • TaskGroup(Python 3.11+)은 구조적 동시성으로 안전한 취소 보장
  • Semaphore로 외부 API rate limit 및 DB 커넥션 풀 한계 대응
  • lifespan으로 HTTP 클라이언트, Redis 연결을 앱 수명주기에 맞게 관리
  • production에서는 uvloop 설치로 이벤트 루프 성능 2-4배 향상

코드벤터는 FastAPI + asyncio 기반의 고성능 비동기 백엔드 설계와 구축 경험을 보유하고 있습니다. 글로벌 협력 네트워크를 바탕으로 스타트업부터 엔터프라이즈까지 실전에서 검증된 비동기 아키텍처를 제공합니다. 기술 스택 선택부터 성능 최적화까지, 코드벤터와 함께라면 더 빠르고 안정적인 서비스를 만들 수 있습니다.

코드픽 - 외주 전문 AI 바이브 코딩 글로벌 진출

댓글 남기기