이 글은 FastAPI와 Celery를 이용한 이메일 전송 시스템을 설계하며 고민했던
비동기 처리 구조, Redis 큐 분리, asyncio 사용 이유에 대해 정리한 기록입니다.
🔗Github Repository
https://github.com/kgh-codingWorld/email-server.git
개요
홈페이지에서 사용자가 'Contact Us'를 통해 이메일을 보내면,
지정된 담당자에게 메일이 전달되는 비동기 이메일 전송 서버를 구현했다.
초기에는 Redis 리스트큐(rpush
/ blpop
) 방식으로 직접 큐를 관리하려 했지만,
이후 구조를 단순화하고 안정성 확보를 위하여 Celery의 내장 메시지 브로커 기능을 활용하는 방향으로 전환하였다.
구조 설계 목표
- 이메일 전송은 비동기 큐로 분리
- 사용자 응답은 즉시, 작업은 큐에서 처리
- 큐 병목 해결을 위한 샤딩/스케일링 가능성 고려
- FastAPI 기반의 비동기 최적화
아키텍처
전체 동작 흐름
1. 사용자 요청 → 이메일 전송 요청 API 호출(/received)
2. FastAPI 서버는 요청을 Celery 작업 큐에 등록하고 task_id 반환
3. Celery 워커는 비동기로 이메일 전송을 처리
4. 이메일 전송 결과를 Redis 상태 저장소(redis-store)에 저장
5. 사용자는 task_id로 결과 조회 가능
Email 요청 흐름
@router.post("/received)
async def email_received(request: EmailRequest):
task_id = str(uuid.uuid4())
process_email.apply_async(args=[request.model_dump(), task_id], queue="celery")
return {"task_id":task_id, "status":"queued"}
- 클라이언트는 /received로 이메일 전송 요청을 보냄
- 요청에는 to, subject, body, username을 포함
- EmailRequest Pydantic 모델로 검증
- UUID 기반 task_id 생성
- Celery 태스크 process_email()에 요청 데이터를 비동기로 큐잉(작업들을 자료구조인 큐에 넣고 FIFO 방식으로 처리하는 구조)
- process_email()이라는 Celery 태스크에
- [이메일 내용, task_id]를 인자로 넣어
- "celery"라는 큐에 백그라운드 실행 요청 등록
- -------apply_async()를 호출하면 일어나는 일------
- Celery는 args를 직렬화함(예를 들어 JSON)
- Redis 브로커에 이 데이터를 저장함
- Celery 워커가 이 메시지를 감시하다가 꺼내서 실행함
- 사용자에게 task_id 반환
비동기 백그라운드 작업 Celery Worker(process_email())
@CELERY.task(bind=True, max_retries=3)
def process_email(self, email_data:dict, task_id:str):
- Celery가 Redis 브로커를 통해 이 태스크를 가져와 실행
- 이메일 전처리 → 이메일 전송 → 결과 Redis에 저장
1) 전처리: preprocess_email()
2) 메일 전송: send_email_async()
- aiosmtplib 사용으로 실제 메일 전송(비동기)
- 전송 성공 시 Redis 상태 저장소에 status: success로 저장
결과 조회
@router.get("/result")
async def get_email_result(task_id: str):
- 클라이언트가 /api/v1/email/result?task_id=... 호출
- redis-store에서 email_response:{task_id} 키로 전송 결과 조회
Redis 구성
역할 | 컨테이너 이름 | 포트 | 환경변수 키 |
요청 큐 (브로커) | redis-broker | 6379 | REDIS_BROKER_URL |
응답 큐 (저장소) | redis-store | 6380 | REDIS_URL |
Celery 구성
CELERY = Celery("email_worker", broker=os.getenv("REDIS_BROKER_URL"))
CELERY.autodiscover_tasks(["server.utils"])
- Celery는 email_worker라는 이름으로 Redis 브로커와 연결(워커인 Celery가 task를 처리할 수 있게 됨)
asyncio와 await를 사용한 이유
- FastAPI는 asyncio 기반 비동기 프레임워크
- await 없이는 실제 코루틴이 실행되지 않음
- 스레드보다 비용 낮고, 비동기 백그라운드 처리에 적합
설계 아이디어(미구현 구조)
아래 내용은 현재 구현에 포함되지는 않지만, 향후 확장을 고려하여 구상했던 설계이다.
- 사용자 단위 큐 샤딩 → 사용자를 해시 or 등급별로 분리하여 장애 범위 축소
- Auto Scaling 연동 → 큐 길이, CPU 사용량 기반으로 워커 수 자동 조절
- Response Queue 확장 → 요청-응답 쌍을 다중 Queue로 관리하여 처리 효율 증가
현재 구조는 이메일 전송을 비동기적으로 처리하고, 요청자는 빠르게 응답을 받을 수 있는 환경을 제공합니다.
설계적으로는 사용자 단위 큐 샤딩, 자동 확장 등도 고려하고 있으며,
향후 트래픽 증가나 중요도 높은 작업에 따라 점진적 고도화가 가능한 구조입니다.