در سیستمهای بلادرنگ، احتمال بروز خطا هنگام ارسال یا دریافت پیامها وجود دارد. Kafka و پایتون ابزارهایی برای مدیریت خطا و اجرای Retry ساده فراهم میکنند تا پیامها از دست نروند.
۱. مدیریت خطا هنگام ارسال پیام (Producer)
from kafka import KafkaProducer
import time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: v.encode('utf-8')
)
topic_name = 'retry-topic'
messages = ['پیام ۱', 'پیام ۲', 'پیام ۳']
for msg in messages:
success = False
retries = 3
while not success and retries > 0:
try:
producer.send(topic_name, value=msg)
producer.flush()
print(f"پیام ارسال شد: {msg}")
success = True
except Exception as e:
print(f"خطا هنگام ارسال پیام: {e}, تلاش مجدد...")
retries -= 1
time.sleep(1)
۲. مدیریت خطا هنگام مصرف پیام (Consumer)
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'retry-topic',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
enable_auto_commit=False, # کنترل commit دستی برای Retry
group_id='retry-group',
value_deserializer=lambda v: v.decode('utf-8')
)
for message in consumer:
try:
print(f"پردازش پیام: {message.value}")
# فرض کنید پردازش ممکن است خطا ایجاد کند
if "۲" in message.value:
raise ValueError("خطای نمونه در پردازش")
# تایید پردازش موفق
consumer.commit()
except Exception as e:
print(f"خطا در پردازش پیام: {e}, تلاش بعدی")
توضیح فرآیند:
- برای Producer، میتوان با حلقه Retry تلاش مجدد برای ارسال پیام انجام داد.
- برای Consumer، با غیرفعال کردن
auto_commit
و commit دستی میتوان پیامهایی که خطا داشتند را مجدداً خواند. - این روش سادهترین شکل مدیریت خطا و Retry در Kafka است و برای پروژههای کوچک و متوسط کافی میباشد.