使用Python构建Kafka示例项目

news2025/5/10 17:29:28

新建项目

mkdir python-kafka-test
cd python-kafka-test

安装依赖

pip install confluent_kafka

创建配置文件

# Kafka配置文件

# Kafka服务器配置
KAFKA_CONFIG = {
    'bootstrap.servers': 'localhost:9092',
    # 生产者特定配置
    'producer': {
        'client.id': 'python-kafka-producer',
        'acks': 'all',                 # 确保消息被所有副本确认
        'retries': 3,                  # 重试次数
        'retry.backoff.ms': 1000,      # 重试间隔
        'batch.size': 16384,           # 批处理大小
        'linger.ms': 5,                # 等待时间以允许更多消息加入批次
        'compression.type': 'snappy',  # 压缩类型
    },
    # 消费者特定配置
    'consumer': {
        'group.id': 'notification-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': True,
        'auto.commit.interval.ms': 5000,
        'session.timeout.ms': 30000,
        'max.poll.interval.ms': 300000,
        'heartbeat.interval.ms': 10000,
    }
}

# 主题配置
TOPICS = {
    'email': 'email-topic',
    'sms': 'sms-topic'
}

创建Kafka生产者

import json
import logging
import signal
import sys
from confluent_kafka import Producer
from config import KAFKA_CONFIG, TOPICS

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('kafka-producer')

# 合并配置
producer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('producer', {})}
# 移除嵌套的producer配置,避免冲突
if 'producer' in producer_config:
    del producer_config['producer']
if 'consumer' in producer_config:
    del producer_config['consumer']

# 创建Producer实例
p = Producer(producer_config)

# 标记是否正在关闭
shutting_down = False

def signal_handler(sig, frame):
    """处理终止信号,确保优雅关闭"""
    global shutting_down
    if shutting_down:
        return
    shutting_down = True
    logger.info("接收到终止信号,正在优雅关闭...")
    # 确保所有消息都被发送
    p.flush(10)  # 等待最多10秒
    logger.info("生产者已关闭")
    sys.exit(0)

# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def delivery_report(err, msg):
    """消息发送回调函数"""
    if err is not None:
        logger.error(f'消息发送失败: {err}')
    else:
        logger.info(f'消息已发送到 {msg.topic()} [分区 {msg.partition()}]')

def send_notification(topic_key, payload, key=None):
    """发送通知消息到指定主题
    
    Args:
        topic_key: 主题键名(在TOPICS字典中定义)
        payload: 消息内容(字典或JSON字符串)
        key: 可选的消息键
    
    Returns:
        bool: 是否成功将消息加入发送队列
    """
    try:
        # 获取实际主题名
        topic = TOPICS.get(topic_key, topic_key)
        
        # 如果payload是字典,转换为JSON字符串
        if isinstance(payload, dict):
            payload = json.dumps(payload)
        
        # 发送消息
        p.produce(
            topic, 
            payload.encode('utf-8'), 
            key=key.encode('utf-8') if key else None,
            callback=delivery_report
        )
        # 轮询一次以触发回调
        p.poll(0)
        
        logger.info(f'消息已加入发送队列: {topic}')
        return True
    except Exception as e:
        logger.error(f'发送消息时出错: {e}')
        return False

# 使用示例
if __name__ == "__main__":
    try:
        # 发送邮件通知
        email_payload = {
            "to": "receiver@example.com", 
            "from": "sender@example.com", 
            "subject": "Sample Email", 
            "body": "This is a sample email notification"
        }
        send_notification('email', email_payload)
        
        # 发送短信通知
        sms_payload = {
            "phoneNumber": "1234567890", 
            "message": "This is a sample SMS notification"
        }
        send_notification('sms', sms_payload)
        
        # 确保所有消息都被发送
        remaining = p.flush(timeout=5)
        if remaining > 0:
            logger.warning(f'仍有 {remaining} 条消息未发送完成')
        else:
            logger.info('所有消息已成功发送')
            
    except KeyboardInterrupt:
        logger.info("程序被用户中断")
    except Exception as e:
        logger.error(f"发生错误: {e}")
    finally:
        # 确保所有消息都被发送
        p.flush(timeout=5)

创建Kafka消费者

import json
import logging
import signal
import sys
from confluent_kafka import Consumer, KafkaError
from config import KAFKA_CONFIG, TOPICS

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger('kafka-consumer')

# 合并配置
consumer_config = {**KAFKA_CONFIG, **KAFKA_CONFIG.get('consumer', {})}
# 移除嵌套的配置,避免冲突
if 'producer' in consumer_config:
    del consumer_config['producer']
if 'consumer' in consumer_config:
    del consumer_config['consumer']

# 创建Consumer实例
c = Consumer(consumer_config)

# 标记是否正在关闭
shutting_down = False

def signal_handler(sig, frame):
    """处理终止信号,确保优雅关闭"""
    global shutting_down
    if shutting_down:
        return
    shutting_down = True
    logger.info("接收到终止信号,正在优雅关闭...")
    sys.exit(0)

# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

def process_message(msg):
    """处理接收到的消息
    
    Args:
        msg: Kafka消息对象
    """
    try:
        topic = msg.topic()
        value = msg.value().decode("utf-8")
        key = msg.key().decode("utf-8") if msg.key() else None
        
        # 尝试解析JSON
        try:
            payload = json.loads(value)
            logger.info(f'接收到消息 [主题: {topic}, 键: {key}]')
            logger.debug(f'消息内容: {payload}')
        except json.JSONDecodeError:
            logger.info(f'接收到非JSON消息 [主题: {topic}, 键: {key}]: {value}')
        
        # 根据主题类型处理不同的消息
        if topic == TOPICS['email']:
            handle_email_notification(payload if 'payload' in locals() else value)
        elif topic == TOPICS['sms']:
            handle_sms_notification(payload if 'payload' in locals() else value)
        else:
            logger.warning(f'收到未知主题的消息: {topic}')
            
    except Exception as e:
        logger.error(f'处理消息时出错: {e}')

def handle_email_notification(payload):
    """处理邮件通知"""
    # 这里实现实际的邮件发送逻辑
    logger.info(f'处理邮件通知: {payload}')

def handle_sms_notification(payload):
    """处理短信通知"""
    # 这里实现实际的短信发送逻辑
    logger.info(f'处理短信通知: {payload}')

def main():
    try:
        # 订阅主题
        topics_to_subscribe = list(TOPICS.values())
        logger.info(f'订阅主题: {topics_to_subscribe}')
        c.subscribe(topics_to_subscribe)
        
        logger.info('开始消费消息...')
        while not shutting_down:
            msg = c.poll(1.0)  # 超时时间1秒
            
            if msg is None:
                continue
                
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # 到达分区末尾,不是错误
                    logger.debug(f'到达分区末尾: {msg.topic()} [{msg.partition()}]')
                    continue
                else:
                    # 其他错误
                    logger.error(f'Kafka错误: {msg.error()}')
                    break
            
            # 处理消息
            process_message(msg)
            
    except KeyboardInterrupt:
        logger.info("程序被用户中断")
    except Exception as e:
        logger.error(f"发生错误: {e}")
    finally:
        # 关闭消费者
        logger.info("关闭消费者...")
        c.close()
        logger.info("消费者已关闭")

if __name__ == "__main__":
    main()

运行项目

打开终端运行命令

python producer.py
python consumer.py

可以看到终端输出正常

详细代码:https://github.com/wan88888/python-kafka-test

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2327734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

本地化部署DeepSeek-R1蒸馏大模型:基于飞桨PaddleNLP 3.0的实战指南

目录 一、飞桨框架3.0:大模型推理新范式的开启1.1 自动并行机制革新:解放多卡推理1.2 推理-训练统一设计:一套代码全流程复用 二、本地部署DeepSeek-R1-Distill-Llama-8B的实战流程2.1 机器环境说明2.2 模型与推理脚本准备2.3 启动 Docker 容…

VBA 64位API声明语句第008讲

跟我学VBA,我这里专注VBA, 授人以渔。我98年开始,从源码接触VBA已经20余年了,随着年龄的增长,越来越觉得有必要把这项技能传递给需要这项技术的职场人员。希望职场和数据打交道的朋友,都来学习VBA,利用VBA,起码可以提高…

Linux信号——信号的保存(2)

关于core和term两种终止方式 core是什么? 将进程在内存中的核心数据(与调试有关)转存到磁盘中形成core,core.pid的文件。 core dump:核心转储。 core与term的区别: term只是普通的终止,而core终止方式还要…

【蓝桥杯嵌入式——学习笔记一】2016年第七届省赛真题重难点解析记录,闭坑指南(文末附完整代码)

在读题过程中发现本次使用的是串口2,需要配置串口2。 但在查看产品手册时发现PA14同时也是SWCLK。 所以在使用串口2时需要拔下跳线帽去连接CH340。 可能是用到串口2的缘故,在烧录时发现报了一个错误。这时我们要想烧录得按着复位键去点击烧录&#xff0c…

基础常问 (概念、代码)

读源码 代码题 Void方法 ,也可以提前rerun;结束 RandomAccessFile类(随机访问文件) 在 Java 中,可以使用RandomAccessFile类来实现文件指针操作。RandomAccessFile提供了对文件内容的随机访问功能,它的文件指针可以通…

J1 ResNet-50算法实战与解析

🍨 本文為🔗365天深度學習訓練營 中的學習紀錄博客🍖 原作者:K同学啊 | 接輔導、項目定制 一、理论知识储备 1. 残差网络的由来 ResNet主要解决了CNN在深度加深时的退化问题(梯度消失与梯度爆炸)。 虽然B…

[MySQL初阶]MySQL(8)索引机制:下

标题:[MySQL初阶]MySQL(8)索引机制:下 水墨不写bug 文章目录 四、从问题到底层,从现象到本质1.为什么插入的数据默认排好序2.MySQL的Page(1)为什么选择用Page?(2&#x…

Muduo网络库实现 [九] - EventLoopThread模块

目录 设计思路 类的设计 模块的实现 私有接口 公有接口 设计思路 我们说过一个EventLoop要绑定一个线程,未来该EventLoop所管理的所有的连接的操作都需要在这个EventLoop绑定的线程中进行,所以我们该如何实现将EventLoop和线程绑定呢?…

《K230 从熟悉到...》识别机器码(AprilTag)

《K230 从熟悉到...》识别机器码(aprirltag) tag id 《庐山派 K230 从熟悉到...》 识别机器码(AprilTag) AprilTag是一种基于二维码的视觉标记系统,最早是由麻省理工学院(MIT)在2008年开发的。A…

栈和队列的概念

1.栈的概念 只允许在固定的一端进行插入和删除,进行数据的插入和数据的删除操作的一端数栈顶,另一端称为栈底。 栈中数据元素遵循后进先出LIFO (Last In First Out) 压栈:栈的插入。 出栈:栈的删除。出入数据在栈顶。 那么下面…

红日靶场一实操笔记

一,网络拓扑图 二,信息搜集 1.kali机地址:192.168.50.129 2.探测靶机 注:需要win7开启c盘里面的phpstudy的服务。 nmap -sV -Pn 192.168.50.128 或者扫 nmap -PO 192.168.50.0/24 可以看出来win7(ip为192.168.50.128)的靶机开…

【目标检测】【深度学习】【Pytorch版本】YOLOV2模型算法详解

【目标检测】【深度学习】【Pytorch版本】YOLOV2模型算法详解 文章目录 【目标检测】【深度学习】【Pytorch版本】YOLOV2模型算法详解前言YOLOV2的模型结构YOLOV2模型的基本执行流程YOLOV2模型的网络参数YOLOV2模型的训练方式 YOLOV2的核心思想前向传播阶段反向传播阶段 总结 前…

NineData云原生智能数据管理平台新功能发布|2025年3月版

本月发布 15 项更新,其中重点发布 3 项、功能优化 11 项、性能优化 1 项。 重点发布 基础服务 - MFA 多因子认证 新增 MFA 多因子认证,提升账号安全性。系统管理员开启后,所有组织成员需绑定认证器,登录时需输入动态验证码。 数…

GLSL(OpenGL 着色器语言)基础语法

GLSL(OpenGL 着色器语言)基础语法 GLSL(OpenGL Shading Language)是 OpenGL 计算着色器的语言,语法类似于 C 语言,但提供了针对 GPU 的特殊功能,如向量运算和矩阵运算。 着色器的开头总是要声明…

Redis基础知识-3

RedisTemplate对多种数据结构的操作 1. String类型 示例代码: // 保存数据 redisTemplate.opsForValue().set("user:1001", "John Doe"); // 设置键值对,无过期时间 redisTemplate.opsForValue().set("user:1002", &qu…

unity各个面板说明

游戏开发,unity各个面板说明 提示:帮帮志会陆续更新非常多的IT技术知识,希望分享的内容对您有用。本章分享的是Python基础语法。前后每一小节的内容是存在的有:学习and理解的关联性,希望对您有用~ unity简介-unity基础…

游戏引擎学习第199天

回顾并发现我们可能破坏了某些东西 目前,我们的调试 UI 运行得相对顺利,可以创建可修改的调试变量,也可以插入分析器(profiler)等特殊视图组件,并进行一些交互操作。然而,在上一次结束时&#…

Linux红帽:RHCSA认证知识讲解(十)使用 tar创建归档和压缩文件

Linux红帽:RHCSA认证知识讲解(十)使用 tar创建归档和压缩文件 前言一、归档与压缩的基本概念1.1 归档与压缩的区别 二、使用tar创建归档文件2.1 tar命令格式2.2 示例操作 三、使用tar进行压缩3.2 命令格式3.3 示例操作 前言 在红帽 Linux 系…

端到端机器学习流水线(MLflow跟踪实验)

目录 端到端机器学习流水线(MLflow跟踪实验)1. 引言2. 项目背景与意义2.1 端到端机器学习流水线的重要性2.2 MLflow的作用2.3 工业级数据处理需求3. 数据集生成与介绍3.1 数据集构成3.2 数据生成方法4. 机器学习流水线与MLflow跟踪4.1 端到端机器学习流水线4.2 MLflow跟踪实验…

相平面案例分析爱情故事

动态系统的分析可以分为三个步骤:第一步描述系统,通过语言来描述系统的特性,第一步描述系统,即通过语言来描述系统的特性;第二步数学分析,即使用数学工具对系统进行量化解析;第三步结果与讨论&a…