در این مرحله، با استفاده از Producer و Consumer پایتون، پیامهای واقعی بین Topic رد و بدل میشوند. این مثال شبیهسازی یک سناریوی ساده پیامرسانی بلادرنگ است.
۱. ارسال پیام با Producer
ابتدا پیامهایی مانند وضعیت یک کاربر یا دادههای حسگر را به Topic ارسال میکنیم:
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8')
)
topic_name = 'sensor-data'
# ارسال چند پیام با فاصله زمانی
for i in range(5):
message = f"داده حسگر شماره {i}"
producer.send(topic_name, value=message)
print(f"پیام ارسال شد: {message}")
time.sleep(1)
producer.flush()
۲. دریافت پیام با Consumer
سپس Consumer پیامها را از Topic میخواند و پردازش میکند:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'sensor-data',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='sensor-group',
value_deserializer=lambda v: v.decode('utf-8')
)
print("در حال دریافت پیامها از Topic 'sensor-data'...")
for message in consumer:
print(f"پیام دریافت شد: {message.value}")
توضیح فرآیند:
- Producer پیامها را به Topic
sensor-data
ارسال میکند. - Kafka پیامها را در Partition ذخیره میکند و شماره Offset به هر پیام اختصاص میدهد.
- Consumer پیامها را از ابتدا (یا از Offset مشخص) میخواند و میتواند آنها را پردازش یا ذخیره کند.
این روش برای پیادهسازی سیستمهای بلادرنگ، جمعآوری دادههای حسگر، لاگها و سایر کاربردهای Event Streaming بسیار مناسب است.