新建项目
mkdir python-kafka-test
cd python-kafka-test
安装依赖
pip install confluent_kafka
创建配置文件
# Kafka配置文件
# Kafka服务器配置
KAFKA_CONFIG = {
'bootstrap.servers': 'localhost:9092',
# 生产者特定配置
'producer': {
'client.id': 'python-kafka-producer',
'acks': 'all', # 确保消息被所有副本确认
'retries': 3, # 重试次数
'retry.backoff.ms': 1000, # 重试间隔
'batch.size': 16384, # 批处理大小
'linger.ms': 5, # 等待时间以允许更多消息加入批次
'compression.type': 'snappy', # 压缩类型
},
# 消费者特定配置
'consumer': {
'group.id': 'notification-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000,
'session.timeout.ms': 30000,
'max.poll.interval.ms': 300000,
'heartbeat.interval.ms': 10000,
}
}
# 主题配置
TOPICS = {
'email': 'email-topic',
'sms': 'sms-topic'
}
创建Kafka生产者
import json
import logging
import signal
import sys
from confluent_kafka import Producer
from config import KAFKA_CONFIG, TOPICS
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger('kafka-producer')
# 合并配置
producer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('producer', {})}
# 移除嵌套的producer配置,避免冲突
if 'producer' in producer_config:
del producer_config['producer']
if 'consumer' in producer_config:
del producer_config['consumer']
# 创建Producer实例
p = Producer(producer_config)
# 标记是否正在关闭
shutting_down = False
def signal_handler(sig, frame):
"""处理终止信号,确保优雅关闭"""
global shutting_down
if shutting_down:
return
shutting_down = True
logger.info("接收到终止信号,正在优雅关闭...")
# 确保所有消息都被发送
p.flush(10) # 等待最多10秒
logger.info("生产者已关闭")
sys.exit(0)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def delivery_report(err, msg):
"""消息发送回调函数"""
if err is not None:
logger.error(f'消息发送失败: {err}')
else:
logger.info(f'消息已发送到 {msg.topic()} [分区 {msg.partition()}]')
def send_notification(topic_key, payload, key=None):
"""发送通知消息到指定主题
Args:
topic_key: 主题键名(在TOPICS字典中定义)
payload: 消息内容(字典或JSON字符串)
key: 可选的消息键
Returns:
bool: 是否成功将消息加入发送队列
"""
try:
# 获取实际主题名
topic = TOPICS.get(topic_key, topic_key)
# 如果payload是字典,转换为JSON字符串
if isinstance(payload, dict):
payload = json.dumps(payload)
# 发送消息
p.produce(
topic,
payload.encode('utf-8'),
key=key.encode('utf-8') if key else None,
callback=delivery_report
)
# 轮询一次以触发回调
p.poll(0)
logger.info(f'消息已加入发送队列: {topic}')
return True
except Exception as e:
logger.error(f'发送消息时出错: {e}')
return False
# 使用示例
if __name__ == "__main__":
try:
# 发送邮件通知
email_payload = {
"to": "receiver@example.com",
"from": "sender@example.com",
"subject": "Sample Email",
"body": "This is a sample email notification"
}
send_notification('email', email_payload)
# 发送短信通知
sms_payload = {
"phoneNumber": "1234567890",
"message": "This is a sample SMS notification"
}
send_notification('sms', sms_payload)
# 确保所有消息都被发送
remaining = p.flush(timeout=5)
if remaining > 0:
logger.warning(f'仍有 {remaining} 条消息未发送完成')
else:
logger.info('所有消息已成功发送')
except KeyboardInterrupt:
logger.info("程序被用户中断")
except Exception as e:
logger.error(f"发生错误: {e}")
finally:
# 确保所有消息都被发送
p.flush(timeout=5)
创建Kafka消费者
import json
import logging
import signal
import sys
from confluent_kafka import Consumer, KafkaError
from config import KAFKA_CONFIG, TOPICS
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger('kafka-consumer')
# 合并配置
consumer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('consumer', {})}
# 移除嵌套的配置,避免冲突
if 'producer' in consumer_config:
del consumer_config['producer']
if 'consumer' in consumer_config:
del consumer_config['consumer']
# 创建Consumer实例
c = Consumer(consumer_config)
# 标记是否正在关闭
shutting_down = False
def signal_handler(sig, frame):
"""处理终止信号,确保优雅关闭"""
global shutting_down
if shutting_down:
return
shutting_down = True
logger.info("接收到终止信号,正在优雅关闭...")
sys.exit(0)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def process_message(msg):
"""处理接收到的消息
Args:
msg: Kafka消息对象
"""
try:
topic = msg.topic()
value = msg.value().decode("utf-8")
key = msg.key().decode("utf-8") if msg.key() else None
# 尝试解析JSON
try:
payload = json.loads(value)
logger.info(f'接收到消息 [主题: {topic}, 键: {key}]')
logger.debug(f'消息内容: {payload}')
except json.JSONDecodeError:
logger.info(f'接收到非JSON消息 [主题: {topic}, 键: {key}]: {value}')
# 根据主题类型处理不同的消息
if topic == TOPICS['email']:
handle_email_notification(payload if 'payload' in locals() else value)
elif topic == TOPICS['sms']:
handle_sms_notification(payload if 'payload' in locals() else value)
else:
logger.warning(f'收到未知主题的消息: {topic}')
except Exception as e:
logger.error(f'处理消息时出错: {e}')
def handle_email_notification(payload):
"""处理邮件通知"""
# 这里实现实际的邮件发送逻辑
logger.info(f'处理邮件通知: {payload}')
def handle_sms_notification(payload):
"""处理短信通知"""
# 这里实现实际的短信发送逻辑
logger.info(f'处理短信通知: {payload}')
def main():
try:
# 订阅主题
topics_to_subscribe = list(TOPICS.values())
logger.info(f'订阅主题: {topics_to_subscribe}')
c.subscribe(topics_to_subscribe)
logger.info('开始消费消息...')
while not shutting_down:
msg = c.poll(1.0) # 超时时间1秒
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# 到达分区末尾,不是错误
logger.debug(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')
continue
else:
# 其他错误
logger.error(f'Kafka错误: {msg.error()}')
break
# 处理消息
process_message(msg)
except KeyboardInterrupt:
logger.info("程序被用户中断")
except Exception as e:
logger.error(f"发生错误: {e}")
finally:
# 关闭消费者
logger.info("关闭消费者...")
c.close()
logger.info("消费者已关闭")
if __name__ == "__main__":
main()
运行项目
打开终端运行命令
python producer.py
python consumer.py
可以看到终端输出正常
详细代码:https://github.com/wan88888/python-kafka-test