برای دریافت پیامها از Kafka، میتوان از کتابخانه kafka-python
و کلاس KafkaConsumer
استفاده کرد. در ادامه نمونه کد یک Consumer ساده ارائه شده است.
نمونه کد Consumer ساده:
from kafka import KafkaConsumer
# اتصال به Topic
topic_name = 'my-topic'
consumer = KafkaConsumer(
topic_name,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest', # خواندن پیامها از ابتدا
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda v: v.decode('utf-8') # تبدیل پیام از بایت به رشته
)
print("در حال دریافت پیامها...")
for message in consumer:
print(f"پیام دریافت شد: {message.value}")
توضیح کد:
- KafkaConsumer: کلاس اصلی برای ایجاد Consumer و اتصال به Topic.
- bootstrap_servers: آدرس و پورت Broker که Consumer به آن متصل میشود.
- auto_offset_reset: مشخص میکند که پیامها از ابتدا یا انتهای Topic خوانده شوند.
- enable_auto_commit: مشخص میکند که Offsetها به صورت خودکار ذخیره شوند.
- group_id: گروه مصرفکننده که برای مدیریت بار بین چند Consumer استفاده میشود.
- value_deserializer: نحوه تبدیل پیام از بایت به رشته برای پردازش آسانتر.