Development Artist

[FastAPI] ๋น„๋™๊ธฐ๋กœ RabbitMQ ์—ฐ๊ฒฐํ•˜๊ธฐ ๋ณธ๋ฌธ

Research/Devops

[FastAPI] ๋น„๋™๊ธฐ๋กœ RabbitMQ ์—ฐ๊ฒฐํ•˜๊ธฐ

JMcunst 2025. 2. 9. 20:56
728x90
๋ฐ˜์‘ํ˜•

๐Ÿ”ฅ ๊ฐœ์š”

FastAPI์™€ RabbitMQ๋ฅผ ํ™œ์šฉํ•˜์—ฌ ๋น„๋™๊ธฐ ๋ฉ”์‹œ์ง€ ์†Œ๋น„ ์‹œ์Šคํ…œ์„ ๊ตฌ์ถ•ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•œ๋‹ค. RabbitMQ๋Š” ๋ฉ”์‹œ์ง€ ํ๋ฅผ ์ œ๊ณตํ•˜์—ฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐ„์˜ ๋น„๋™๊ธฐ ํ†ต์‹ ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•œ๋‹ค. ์ด๋ฅผ FastAPI์™€ ๊ฒฐํ•ฉํ•˜๋ฉด ๊ณ ์„ฑ๋Šฅ์˜ ๋น„๋™๊ธฐ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์„œ๋น„์Šค๋ฅผ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” FastAPI ์„œ๋ฒ„์™€ RabbitMQ๋ฅผ ์—ฐ๊ฒฐํ•˜๋Š” ๋ฐฉ๋ฒ•, ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๋Š” ๋ฐฉ์‹, ๋น„๋™๊ธฐ์ ์œผ๋กœ ์™ธ๋ถ€ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๊ฐ„๋žตํ•˜๊ฒŒ ๊ธฐ์ˆ ํ•œ๋‹ค.


๐Ÿ“Œ RabbitMQ๋ž€?

RabbitMQ๋Š” AMQP(Advanced Message Queuing Protocol) ๊ธฐ๋ฐ˜์˜ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋กœ, ๋ฉ”์‹œ์ง€๋ฅผ ํ์— ์ €์žฅํ•˜๊ณ  ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋ถ„์‚ฐ ์‹œ์Šคํ…œ ๊ฐ„ ๋ฐ์ดํ„ฐ๋ฅผ ์ „๋‹ฌํ•˜๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.

RabbitMQ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ด์ ์ด ์žˆ๋‹ค:

  • ๋น„๋™๊ธฐ ๋ฉ”์‹œ์ง•: ์ž‘์—…์„ ๋‹ค๋ฅธ ์„œ๋น„์Šค๋กœ ๋„˜๊ธฐ๊ณ  ๋น„๋™๊ธฐ์ ์œผ๋กœ ์‹คํ–‰ ๊ฐ€๋Šฅ
  • ๋ถ€ํ•˜ ๋ถ„์‚ฐ: ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์†Œ๋น„์ž(Consumer)๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋‚˜๋ˆ„์–ด ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
  • ์žฅ์•  ๋Œ€์‘: ๋ฉ”์‹œ์ง€๋ฅผ ํ์— ์ €์žฅํ•˜์—ฌ ์žฅ์•  ๋ฐœ์ƒ ์‹œ ๋ณต๊ตฌ ๊ฐ€๋Šฅ
  • ํ™•์žฅ์„ฑ: ๋‹ค์–‘ํ•œ ์„œ๋น„์Šค์™€ ํ†ตํ•ฉ ๊ฐ€๋Šฅ (Django, FastAPI, Node.js ๋“ฑ)

๐Ÿ›  RabbitMQ + FastAPI ํ™˜๊ฒฝ ๊ตฌ์ถ•

1๏ธโƒฃ RabbitMQ ์„ค์น˜ ๋ฐ ์‹คํ–‰

RabbitMQ๋Š” Docker๋ฅผ ์ด์šฉํ•˜์—ฌ ๊ฐ„๋‹จํžˆ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3-management

RabbitMQ ๋Œ€์‹œ๋ณด๋“œ์— ์ ‘์†ํ•˜์—ฌ ํ ์ƒํƒœ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

  • URL: http://localhost:15672/
  • ๋กœ๊ทธ์ธ ์ •๋ณด: admin / admin

2๏ธโƒฃ FastAPI ํ”„๋กœ์ ํŠธ ์ƒ์„ฑ

mkdir fastapi-rabbitmq && cd fastapi-rabbitmq
python -m venv venv
source venv/bin/activate  # (Windows์˜ ๊ฒฝ์šฐ venv\Scripts\activate)
pip install fastapi uvicorn aio_pika

3๏ธโƒฃ FastAPI ์„œ๋ฒ„ ์„ค์ • (main.py)

import asyncio
import signal
from fastapi import FastAPI
from contextlib import asynccontextmanager

IS_PROCESSING = False

def handle_exit_signal(loop):
    def _handler(signal_received, frame):
        print(f"Received exit signal: {signal_received}. Initiating shutdown...")
        loop.create_task(shutdown_handler())

    signal.signal(signal.SIGTERM, _handler)
    signal.signal(signal.SIGINT, _handler)

async def shutdown_handler():
    global IS_PROCESSING
    while IS_PROCESSING:
        print("Waiting for Blender process to complete...")
        await asyncio.sleep(1)
    print("Blender process completed. Shutting down...")

@asynccontextmanager
async def lifespan(app: FastAPI):
    loop = asyncio.get_event_loop()
    handle_exit_signal(loop)
    connection = await connect_to_rabbitmq(loop)
    try:
        yield
    finally:
        await shutdown_handler()
        await connection.close()

app = FastAPI(lifespan=lifespan)

์ด์ œ FastAPI๊ฐ€ ์‹คํ–‰๋  ๋•Œ RabbitMQ์™€ ์—ฐ๊ฒฐํ•˜๊ณ , ์ข…๋ฃŒ๋  ๋•Œ ์—ฐ๊ฒฐ์„ ๋‹ซ๋„๋ก ์„ค์ •.


๐Ÿ—๏ธ RabbitMQ ์—ฐ๊ฒฐ ๋ฐ ๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž (Consumer) ์„ค์ •

4๏ธโƒฃ RabbitMQ ์—ฐ๊ฒฐ ๋ฐ ๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž ๋“ฑ๋ก

import asyncio
from aio_pika import connect_robust, IncomingMessage

async def process_message(message: IncomingMessage):
    """RabbitMQ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋น„๋™๊ธฐ ํ•จ์ˆ˜"""
    global IS_PROCESSING
    async with processing_lock:
        try:
            IS_PROCESSING = True
            body = message.body.decode()
            print(f"Received message: {body}")
            await message.ack()
        except Exception as e:
            print(f"Error processing message: {e}")
            await message.nack(requeue=True)
        finally:
            IS_PROCESSING = False

async def connect_to_rabbitmq(loop):
    """RabbitMQ ์—ฐ๊ฒฐ ๋ฐ ์†Œ๋น„์ž ๋“ฑ๋ก"""
    connection = await connect_robust(settings.RABBITMQ_URL, loop=loop)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue = await channel.declare_queue("task_queue", durable=True)
    await queue.consume(process_message)
    return connection

5๏ธโƒฃ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰์ž (Producer) ์„ค์ •

RabbitMQ ํ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” ์ฝ”๋“œ.

import aio_pika
import asyncio

async def send_message(message: str):
    """RabbitMQ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ํ•จ์ˆ˜"""
    connection = await aio_pika.connect_robust("amqp://admin:admin@localhost/")
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue("task_queue", durable=True)
        await channel.default_exchange.publish(
            aio_pika.Message(body=message.encode()),
            routing_key=queue.name
        )

asyncio.run(send_message("Hello, RabbitMQ!"))

๐ŸŽฏ ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ์ด์œ 

โœ… ๋ธ”๋กœํ‚น ์—†์ด ๋™์‹œ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

  • aio_pika ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ RabbitMQ ๋ฉ”์‹œ์ง€๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์†Œ๋น„.
  • ๋ฉ”์‹œ์ง€๊ฐ€ ๋„์ฐฉํ•˜๋ฉด ์ฆ‰์‹œ ๋‹ค๋ฅธ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์–ด ์„ฑ๋Šฅ ํ–ฅ์ƒ.

โœ… ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ

  • ์—ฌ๋Ÿฌ ๊ฐœ์˜ Worker๊ฐ€ ๋™์ผํ•œ ํ๋ฅผ ๊ณต์œ ํ•˜์—ฌ ๋กœ๋“œ ๋ฐธ๋Ÿฐ์‹ฑ ๊ฐ€๋Šฅ.
  • ๋™์ผํ•œ task_queue์— ์—ฌ๋Ÿฌ Consumer๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ์ž๋™์œผ๋กœ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ.

๐Ÿš€ ๊ฒฐ๋ก 

FastAPI์™€ RabbitMQ๋ฅผ ํ™œ์šฉํ•˜๋ฉด ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๊ณ , ์™ธ๋ถ€ ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ฐ•๋ ฅํ•œ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฒˆ ๊ธ€์—์„œ ๋‹ค๋ฃฌ ์ฃผ์š” ๊ฐœ๋…์„ ์ •๋ฆฌํ•˜๋ฉด:

  • โœ… RabbitMQ ์„ค์น˜ ๋ฐ FastAPI ์—ฐ๋™
  • โœ… RabbitMQ Consumer(๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž) ๋“ฑ๋ก ๋ฐ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
  • โœ… ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜์—ฌ ์„ฑ๋Šฅ ํ–ฅ์ƒ
  • โœ… Producer๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ  Consumer๊ฐ€ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ตฌ์„ฑ

๋„์›€์ด ๋˜๊ธธ ๋ฐ”๋ž€๋‹ค.

728x90
๋ฐ˜์‘ํ˜•
Comments