ارسال یک پیام به Kafka با FastAPI

  • مدرس : علی بیگدلی
  • تاریخ انتشار: 1404/06/25
  • تعداد بازدید: 2

می‌توان با استفاده از FastAPI و kafka-python یک API ساخت که پیام‌ها را از کاربر دریافت کرده و به Topic در Kafka ارسال کند. در ادامه نمونه ساده‌ای از این فرآیند ارائه شده است.

نمونه کد FastAPI برای ارسال پیام:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from kafka import KafkaProducer

app = FastAPI()

# تعریف مدل پیام
class Message(BaseModel):
    content: str

# اتصال به Kafka
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: v.encode('utf-8')
)

topic_name = 'fastapi-topic'

# مسیر ارسال پیام
@app.post("/send")
async def send_message(message: Message):
    try:
        producer.send(topic_name, value=message.content)
        producer.flush()
        return {"status": "موفق", "message": message.content}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

توضیح کد:

  • Message(BaseModel): مدل داده‌ای برای دریافت پیام از درخواست HTTP.
  • KafkaProducer: اتصال به Kafka و تعیین Topic و سریالایزر پیام.
  • @app.post("/send/"): مسیر API که پیام را دریافت کرده و به Kafka ارسال می‌کند.
  • producer.send و producer.flush: ارسال پیام و اطمینان از تحویل آن.

با این روش می‌توان یک سرویس FastAPI داشت که درخواست‌های HTTP را به پیام‌های Kafka تبدیل کرده و برای مصرف‌کننده‌ها ارسال می‌کند.

ثبت دیدگاه


نکته: آدرس ایمیل شما منتشر نخواهد شد

دیدگاه کاربران (0)


هیچ دیدگاهی ثبت نشده است. می‌توانید اولین نفر باشید.