میتوان با استفاده از FastAPI و kafka-python یک API ساخت که پیامها را از کاربر دریافت کرده و به Topic در Kafka ارسال کند. در ادامه نمونه سادهای از این فرآیند ارائه شده است.
نمونه کد FastAPI برای ارسال پیام:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from kafka import KafkaProducer
app = FastAPI()
# تعریف مدل پیام
class Message(BaseModel):
content: str
# اتصال به Kafka
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8')
)
topic_name = 'fastapi-topic'
# مسیر ارسال پیام
@app.post("/send")
async def send_message(message: Message):
try:
producer.send(topic_name, value=message.content)
producer.flush()
return {"status": "موفق", "message": message.content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
توضیح کد:
- Message(BaseModel): مدل دادهای برای دریافت پیام از درخواست HTTP.
- KafkaProducer: اتصال به Kafka و تعیین Topic و سریالایزر پیام.
- @app.post("/send/"): مسیر API که پیام را دریافت کرده و به Kafka ارسال میکند.
- producer.send و producer.flush: ارسال پیام و اطمینان از تحویل آن.
با این روش میتوان یک سرویس FastAPI داشت که درخواستهای HTTP را به پیامهای Kafka تبدیل کرده و برای مصرفکنندهها ارسال میکند.