์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- ๋ถํ ์ ๋ณต
- vuejs
- DFS
- ์ฝ๋ฉํ ์คํธ
- issue
- codingtest
- cos pro
- ์๋๋ก์ด๋
- ๋์ ๊ณํ๋ฒ๊ณผ์ต๋จ๊ฑฐ๋ฆฌ์ญ์ถ์
- android
- DART
- ์๊ณ ๋ฆฌ์ฆ
- BAEKJOON
- django
- ์ฝํ
- ์ฝ๋ํ์์ด
- Python
- AndroidStudio
- ํ์ด์ฌ
- ์๋๋ก์ด๋์คํ๋์ค
- DFS์BFS
- ๊ฐ๋ฐ
- Flutter
- cos
- ๋์ ๊ณํ๋ฒ
- Algorithm
- ๋ฐฑ์ค
- Vue
- C++
- cos pro 1๊ธ
- Today
- Total
Development Artist
[FastAPI] ๋น๋๊ธฐ๋ก RabbitMQ ์ฐ๊ฒฐํ๊ธฐ ๋ณธ๋ฌธ
๐ฅ ๊ฐ์
FastAPI์ RabbitMQ๋ฅผ ํ์ฉํ์ฌ ๋น๋๊ธฐ ๋ฉ์์ง ์๋น ์์คํ ์ ๊ตฌ์ถํ๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํ๋ค. RabbitMQ๋ ๋ฉ์์ง ํ๋ฅผ ์ ๊ณตํ์ฌ ์ ํ๋ฆฌ์ผ์ด์ ๊ฐ์ ๋น๋๊ธฐ ํต์ ์ ๊ฐ๋ฅํ๊ฒ ํ๋ค. ์ด๋ฅผ FastAPI์ ๊ฒฐํฉํ๋ฉด ๊ณ ์ฑ๋ฅ์ ๋น๋๊ธฐ ๋ฉ์์ง ์ฒ๋ฆฌ ์๋น์ค๋ฅผ ๊ตฌํํ ์ ์๋ค.
์ด๋ฒ ํฌ์คํ ์์๋ FastAPI ์๋ฒ์ RabbitMQ๋ฅผ ์ฐ๊ฒฐํ๋ ๋ฐฉ๋ฒ, ๋ฉ์์ง๋ฅผ ์๋นํ๋ ๋ฐฉ์, ๋น๋๊ธฐ์ ์ผ๋ก ์ธ๋ถ ํ๋ก์ธ์ค๋ฅผ ์คํํ๋ ๋ฐฉ๋ฒ์ ๊ฐ๋ตํ๊ฒ ๊ธฐ์ ํ๋ค.
๐ RabbitMQ๋?
RabbitMQ๋ AMQP(Advanced Message Queuing Protocol) ๊ธฐ๋ฐ์ ๋ฉ์์ง ๋ธ๋ก์ปค๋ก, ๋ฉ์์ง๋ฅผ ํ์ ์ ์ฅํ๊ณ ๋น๋๊ธฐ์ ์ผ๋ก ๋ถ์ฐ ์์คํ ๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํ๋ ์ญํ ์ ํ๋ค.
RabbitMQ๋ฅผ ์ฌ์ฉํ๋ฉด ๋ค์๊ณผ ๊ฐ์ ์ด์ ์ด ์๋ค:
- ๋น๋๊ธฐ ๋ฉ์์ง: ์์ ์ ๋ค๋ฅธ ์๋น์ค๋ก ๋๊ธฐ๊ณ ๋น๋๊ธฐ์ ์ผ๋ก ์คํ ๊ฐ๋ฅ
- ๋ถํ ๋ถ์ฐ: ์ฌ๋ฌ ๊ฐ์ ์๋น์(Consumer)๊ฐ ๋ฉ์์ง๋ฅผ ๋๋์ด ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ์ฅ์ ๋์: ๋ฉ์์ง๋ฅผ ํ์ ์ ์ฅํ์ฌ ์ฅ์ ๋ฐ์ ์ ๋ณต๊ตฌ ๊ฐ๋ฅ
- ํ์ฅ์ฑ: ๋ค์ํ ์๋น์ค์ ํตํฉ ๊ฐ๋ฅ (Django, FastAPI, Node.js ๋ฑ)
๐ RabbitMQ + FastAPI ํ๊ฒฝ ๊ตฌ์ถ
1๏ธโฃ RabbitMQ ์ค์น ๋ฐ ์คํ
RabbitMQ๋ Docker๋ฅผ ์ด์ฉํ์ฌ ๊ฐ๋จํ ์คํํ ์ ์๋ค.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3-management
RabbitMQ ๋์๋ณด๋์ ์ ์ํ์ฌ ํ ์ํ๋ฅผ ํ์ธํ ์ ์๋ค.
- URL:
http://localhost:15672/
- ๋ก๊ทธ์ธ ์ ๋ณด:
admin / admin
2๏ธโฃ FastAPI ํ๋ก์ ํธ ์์ฑ
mkdir fastapi-rabbitmq && cd fastapi-rabbitmq
python -m venv venv
source venv/bin/activate # (Windows์ ๊ฒฝ์ฐ venv\Scripts\activate)
pip install fastapi uvicorn aio_pika
3๏ธโฃ FastAPI ์๋ฒ ์ค์ (main.py
)
import asyncio
import signal
from fastapi import FastAPI
from contextlib import asynccontextmanager
IS_PROCESSING = False
def handle_exit_signal(loop):
def _handler(signal_received, frame):
print(f"Received exit signal: {signal_received}. Initiating shutdown...")
loop.create_task(shutdown_handler())
signal.signal(signal.SIGTERM, _handler)
signal.signal(signal.SIGINT, _handler)
async def shutdown_handler():
global IS_PROCESSING
while IS_PROCESSING:
print("Waiting for Blender process to complete...")
await asyncio.sleep(1)
print("Blender process completed. Shutting down...")
@asynccontextmanager
async def lifespan(app: FastAPI):
loop = asyncio.get_event_loop()
handle_exit_signal(loop)
connection = await connect_to_rabbitmq(loop)
try:
yield
finally:
await shutdown_handler()
await connection.close()
app = FastAPI(lifespan=lifespan)
์ด์ FastAPI๊ฐ ์คํ๋ ๋ RabbitMQ์ ์ฐ๊ฒฐํ๊ณ , ์ข ๋ฃ๋ ๋ ์ฐ๊ฒฐ์ ๋ซ๋๋ก ์ค์ .
๐๏ธ RabbitMQ ์ฐ๊ฒฐ ๋ฐ ๋ฉ์์ง ์๋น์ (Consumer) ์ค์
4๏ธโฃ RabbitMQ ์ฐ๊ฒฐ ๋ฐ ๋ฉ์์ง ์๋น์ ๋ฑ๋ก
import asyncio
from aio_pika import connect_robust, IncomingMessage
async def process_message(message: IncomingMessage):
"""RabbitMQ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ ๋น๋๊ธฐ ํจ์"""
global IS_PROCESSING
async with processing_lock:
try:
IS_PROCESSING = True
body = message.body.decode()
print(f"Received message: {body}")
await message.ack()
except Exception as e:
print(f"Error processing message: {e}")
await message.nack(requeue=True)
finally:
IS_PROCESSING = False
async def connect_to_rabbitmq(loop):
"""RabbitMQ ์ฐ๊ฒฐ ๋ฐ ์๋น์ ๋ฑ๋ก"""
connection = await connect_robust(settings.RABBITMQ_URL, loop=loop)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("task_queue", durable=True)
await queue.consume(process_message)
return connection
5๏ธโฃ ๋ฉ์์ง ๋ฐํ์ (Producer) ์ค์
RabbitMQ ํ์ ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ ์ฝ๋.
import aio_pika
import asyncio
async def send_message(message: str):
"""RabbitMQ์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ํจ์"""
connection = await aio_pika.connect_robust("amqp://admin:admin@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("task_queue", durable=True)
await channel.default_exchange.publish(
aio_pika.Message(body=message.encode()),
routing_key=queue.name
)
asyncio.run(send_message("Hello, RabbitMQ!"))
๐ฏ ๋น๋๊ธฐ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ ์ด์
โ ๋ธ๋กํน ์์ด ๋์ ์ฒ๋ฆฌ ๊ฐ๋ฅ
aio_pika
๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ RabbitMQ ๋ฉ์์ง๋ฅผ ๋น๋๊ธฐ์ ์ผ๋ก ์๋น.- ๋ฉ์์ง๊ฐ ๋์ฐฉํ๋ฉด ์ฆ์ ๋ค๋ฅธ ์์ ์ ์ํํ ์ ์์ด ์ฑ๋ฅ ํฅ์.
โ ๋ณ๋ ฌ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ์ฌ๋ฌ ๊ฐ์ Worker๊ฐ ๋์ผํ ํ๋ฅผ ๊ณต์ ํ์ฌ ๋ก๋ ๋ฐธ๋ฐ์ฑ ๊ฐ๋ฅ.
- ๋์ผํ
task_queue
์ ์ฌ๋ฌ Consumer๋ฅผ ์คํํ๋ฉด ์๋์ผ๋ก ๋ถ์ฐ ์ฒ๋ฆฌ.
๐ ๊ฒฐ๋ก
FastAPI์ RabbitMQ๋ฅผ ํ์ฉํ๋ฉด ๋น๋๊ธฐ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์๋นํ๊ณ , ์ธ๋ถ ํ๋ก์ธ์ค๋ฅผ ์คํํ๋ ๊ฐ๋ ฅํ ์ํคํ ์ฒ๋ฅผ ๊ตฌ์ถํ ์ ์๋ค.
์ด๋ฒ ๊ธ์์ ๋ค๋ฃฌ ์ฃผ์ ๊ฐ๋ ์ ์ ๋ฆฌํ๋ฉด:
- โ RabbitMQ ์ค์น ๋ฐ FastAPI ์ฐ๋
- โ RabbitMQ Consumer(๋ฉ์์ง ์๋น์) ๋ฑ๋ก ๋ฐ ๋ฉ์์ง ์ฒ๋ฆฌ
- โ ๋น๋๊ธฐ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์๋นํ์ฌ ์ฑ๋ฅ ํฅ์
- โ Producer๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๋ฐํํ๊ณ Consumer๊ฐ ์ฒ๋ฆฌํ๋๋ก ๊ตฌ์ฑ
๋์์ด ๋๊ธธ ๋ฐ๋๋ค.
'Research > Devops' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Helm] Helm Chart ํ ํ๋ฆฟ ํ์ผ์ ๋ํ์ฌ - Loki Ingester StatefulSet (0) | 2025.02.08 |
---|---|
GluserFS์ ๋ํ์ฌ (0) | 2025.02.04 |
Kubernetes๋ฅผ ์ฌ์ฉํ๋ ์ด์ ๋๋ ์ด์ (0) | 2024.08.15 |
[AWS RDS, MySQL, DBeaver, Mac] Connection Timed Out (0) | 2023.07.09 |
[Centos 7, Postgresql 15] ์ค์น. ( version 11 or later ) (0) | 2022.12.29 |