
Python의 asyncio는 단순한 라이브러리가 아닙니다. I/O 바운드 작업이 많은 웹 서비스에서 수천 개의 동시 요청을 효율적으로 처리하기 위한 핵심 패러다임입니다. FastAPI가 빠른 이유도 결국 asyncio 위에 서 있기 때문입니다. 이 글에서는 asyncio의 핵심 개념부터 FastAPI에서 실제로 자주 마주치는 비동기 패턴과 함정까지, 실전 코드와 함께 정리합니다.
1. asyncio의 핵심 — 이벤트 루프와 코루틴
asyncio의 핵심은 단일 스레드에서 여러 작업을 번갈아 실행하는 이벤트 루프입니다. CPU가 I/O를 기다리는 시간을 낭비하지 않고 다른 작업을 처리합니다. 코루틴(coroutine)은 async def로 선언하고, await로 실행을 일시 중단합니다.
# 기본 코루틴과 이벤트 루프
import asyncio
async def fetch_data(name: str, delay: float) -> str:
print(f"{name} 시작")
await asyncio.sleep(delay) # I/O 대기 시뮬레이션
print(f"{name} 완료")
return f"{name} 결과"
async def main():
# 순차 실행 — 총 3초
# result1 = await fetch_data("작업A", 1)
# result2 = await fetch_data("작업B", 2)
# 병렬 실행 — 총 2초 (가장 긴 작업 기준)
results = await asyncio.gather(
fetch_data("작업A", 1),
fetch_data("작업B", 2),
fetch_data("작업C", 1.5),
)
print(results)
asyncio.run(main())
# 출력:
# 작업A 시작
# 작업B 시작
# 작업C 시작
# 작업A 완료 (1초 후)
# 작업C 완료 (1.5초 후)
# 작업B 완료 (2초 후)
asyncio.gather()는 여러 코루틴을 동시에 스케줄링합니다. 순차 실행이라면 1+2+1.5=4.5초가 걸릴 작업이 2초 안에 끝납니다. FastAPI가 높은 동시성을 보여주는 원리가 바로 이것입니다.
2. FastAPI에서 async/await — 언제 써야 하나
FastAPI는 async def와 일반 def 엔드포인트를 모두 지원합니다. 하지만 어느 것을 써야 하는지에 대한 오해가 많습니다.
핵심 원칙: I/O 바운드 작업(DB, HTTP 요청, 파일)은 async def. CPU 바운드 작업(이미지 처리, 암호화, 복잡한 계산)은 일반 def 또는 별도 프로세스풀.
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import httpx
app = FastAPI()
# 비동기 DB 엔진 설정
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
pool_size=20,
max_overflow=0,
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# ✅ 올바른 예 — DB 조회는 async def
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# ✅ 올바른 예 — 외부 API 호출은 async + httpx
@app.get("/external-data")
async def get_external():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data", timeout=10)
return response.json()
# ❌ 함정 — async def 안에서 동기 블로킹 함수 호출
@app.get("/bad-example")
async def bad_example():
import time
time.sleep(2) # 이벤트 루프 전체를 2초 동안 블록!
return {"error": "이러면 안 됩니다"}
3. 동시성 함정과 해결법
asyncio를 쓴다고 무조건 빨라지는 게 아닙니다. 가장 흔한 실수는 async def 안에서 블로킹 코드를 실행하는 것입니다. 이 경우 이벤트 루프 자체가 멈추고 모든 요청이 대기 상태가 됩니다.
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from fastapi import FastAPI
app = FastAPI()
thread_pool = ThreadPoolExecutor(max_workers=10)
process_pool = ProcessPoolExecutor(max_workers=4)
# ✅ 블로킹 I/O → run_in_executor로 스레드풀 위임
@app.get("/blocking-io")
async def blocking_io_endpoint():
loop = asyncio.get_event_loop()
# requests 같은 동기 라이브러리를 써야 할 때
result = await loop.run_in_executor(
thread_pool,
lambda: some_blocking_io_function()
)
return {"result": result}
# ✅ CPU 집약 작업 → ProcessPoolExecutor
@app.get("/cpu-intensive")
async def cpu_intensive_endpoint():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
process_pool,
heavy_computation, # PIL 이미지 처리, 암호화 등
some_argument
)
return {"result": result}
# ✅ asyncio.TaskGroup (Python 3.11+) — 에러 처리가 더 안전
@app.get("/parallel-requests")
async def parallel_requests():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch_user_data())
task2 = tg.create_task(fetch_order_data())
task3 = tg.create_task(fetch_notification_data())
# 하나라도 실패하면 ExceptionGroup 발생 → 나머지 자동 취소
return {
"user": task1.result(),
"orders": task2.result(),
"notifications": task3.result(),
}
4. 비동기 컨텍스트와 생명주기 관리
FastAPI의 lifespan 이벤트와 비동기 의존성 주입을 활용하면 DB 연결 풀, HTTP 클라이언트, Redis 커넥터 등을 효율적으로 관리할 수 있습니다.
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import httpx
import redis.asyncio as aioredis
# 전역 리소스
http_client: httpx.AsyncClient = None
redis_client: aioredis.Redis = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# 시작 시 초기화
global http_client, redis_client
http_client = httpx.AsyncClient(timeout=30, limits=httpx.Limits(max_connections=100))
redis_client = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
print("리소스 초기화 완료")
yield # 애플리케이션 실행
# 종료 시 정리
await http_client.aclose()
await redis_client.close()
print("리소스 정리 완료")
app = FastAPI(lifespan=lifespan)
# 비동기 의존성 — DB 세션
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# 캐시 레이어와 DB 조회 조합
@app.get("/products/{product_id}")
async def get_product(product_id: int, db: AsyncSession = Depends(get_db)):
# Redis 캐시 확인
cache_key = f"product:{product_id}"
cached = await redis_client.get(cache_key)
if cached:
return json.loads(cached)
# DB 조회
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if product:
# 캐시 저장 (5분)
await redis_client.setex(cache_key, 300, json.dumps(product.to_dict()))
return product
5. asyncio.Semaphore로 동시성 제한
외부 API에 너무 많은 요청을 동시에 보내면 rate limit에 걸립니다. asyncio.Semaphore로 동시 실행 수를 제어할 수 있습니다.
import asyncio
import httpx
# 동시 5개까지만 허용
semaphore = asyncio.Semaphore(5)
async def fetch_with_limit(client: httpx.AsyncClient, url: str) -> dict:
async with semaphore: # 5개 초과 시 여기서 대기
response = await client.get(url)
return response.json()
async def batch_fetch(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient() as client:
tasks = [fetch_with_limit(client, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 실패한 요청 필터링
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
if failed:
print(f"실패: {len(failed)}건")
return successful
# FastAPI 엔드포인트에서 활용
@app.post("/batch-process")
async def batch_process(items: list[str]):
results = await batch_fetch(items)
return {"processed": len(results), "results": results}
6. 실전 성능 팁 — asyncio 디버깅
asyncio 코드의 성능 문제를 진단할 때는 PYTHONASYNCIODEBUG=1 환경변수와 asyncio.set_debug(True)를 활용하세요. 느린 코루틴(기본값 100ms 이상)을 자동으로 경고합니다.
import asyncio
import logging
import time
# 디버그 모드 활성화
asyncio.set_debug(True)
logging.basicConfig(level=logging.DEBUG)
# 실행 시간 측정 데코레이터
def async_timer(func):
async def wrapper(*args, **kwargs):
start = time.perf_counter()
result = await func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
print(f"[TIMER] {func.__name__}: {elapsed:.2f}ms")
return result
return wrapper
@async_timer
async def slow_db_query():
await asyncio.sleep(0.5) # 500ms 쿼리 시뮬레이션
return {"data": "result"}
# uvicorn 실행 시 reload=True면 이벤트 루프가 재생성됨에 주의
# production: uvicorn main:app --workers 4 --loop uvloop
# uvloop 설치: pip install uvloop
# uvloop은 asyncio보다 2-4배 빠른 C 기반 이벤트 루프
asyncio 핵심 정리
- async def는 I/O 바운드 작업에, 블로킹 코드는
run_in_executor로 - asyncio.gather()로 독립적인 작업을 병렬화, 에러 처리는
return_exceptions=True - TaskGroup(Python 3.11+)은 구조적 동시성으로 안전한 취소 보장
- Semaphore로 외부 API rate limit 및 DB 커넥션 풀 한계 대응
- lifespan으로 HTTP 클라이언트, Redis 연결을 앱 수명주기에 맞게 관리
- production에서는 uvloop 설치로 이벤트 루프 성능 2-4배 향상
코드벤터는 FastAPI + asyncio 기반의 고성능 비동기 백엔드 설계와 구축 경험을 보유하고 있습니다. 글로벌 협력 네트워크를 바탕으로 스타트업부터 엔터프라이즈까지 실전에서 검증된 비동기 아키텍처를 제공합니다. 기술 스택 선택부터 성능 최적화까지, 코드벤터와 함께라면 더 빠르고 안정적인 서비스를 만들 수 있습니다.



