【C/C++】从零开始掌握Kafka

news2025/12/18 20:34:08

文章目录

  • 从零开始掌握Kafka
    • 一、Kafka 基础知识理解(理论)
      • 1. 核心组件与架构
      • 2. 重点概念解析
    • 二、Kafka 面试重点知识梳理
    • 三、C++ 使用 Kafka 的实践(librdkafka)
      • 1. librdkafka 简介
      • 2. 安装 librdkafka
    • 四、实战:高吞吐生产者与消费者
      • 1. 生产者示例(Producer.cpp)
      • 2. 消费者示例(Consumer.cpp)
    • 五、Kafka 开发相关 C++ 能力
    • 六、推荐资料与开源项目

从零开始掌握Kafka

一、Kafka 基础知识理解(理论)

1. 核心组件与架构

组件作用
BrokerKafka 节点,负责存储消息
Topic消息主题,逻辑上的分类
Partition一个 Topic 的分片,支持并发与扩展性
Producer负责发送消息
Consumer负责消费消息
Consumer Group多消费者协作消费
Zookeeper / KRaft负责元数据与协调(未来版本转向 KRaft 模式)

2. 重点概念解析

  • Partition:分片,支持水平扩展(每个 partition 是一个有序日志)。

  • 副本机制(Replication):每个 Partition 有一个 leader + N 个 follower,保证高可用。

  • 消费者组(Consumer Group):Kafka 实现广播和负载均衡消费的机制。

  • offset 管理

    • 自动提交(enable.auto.commit)
    • 手动提交(commitSync / commitAsync)
    • Kafka 默认 offset 存在 __consumer_offsets topic 中。

二、Kafka 面试重点知识梳理

面试点说明
消息顺序性同一个 partition 内有顺序,跨 partition 无法保证
幂等性生产使用 enable.idempotence=true,避免 producer 重试造成重复发送
分布式一致性ISR 机制,消息写入需同步到 follower;ACK=all 实现强一致
消费位点提交手动提交 offset 是保证消费语义精确一次的关键
Rebalance 原理消费者上下线会触发 Rebalance,导致 partition 分配变化

三、C++ 使用 Kafka 的实践(librdkafka)

1. librdkafka 简介

  • 官方提供的高性能 C/C++ Kafka 客户端库。

  • GitHub 地址:https://github.com/edenhill/librdkafka

  • 支持:

    • 高吞吐的生产与消费
    • offset 提交
    • topic/partition 管理
    • 幂等发送、压缩、批处理

2. 安装 librdkafka

# Ubuntu
sudo apt-get install librdkafka-dev

# Or from source
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install

四、实战:高吞吐生产者与消费者

此处只是简单介绍,完整工程见kafka简单工程

1. 生产者示例(Producer.cpp)

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>

class ExampleEventCb : public RdKafka::EventCb {
  void event_cb(RdKafka::Event &event) override {
    if (event.type() == RdKafka::Event::EVENT_ERROR) {
      std::cerr << "Kafka Error: " << event.str() << std::endl;
    }
  }
};

int main() {
  std::string brokers = "kafka:9092";
  std::string topic_str = "test_topic";
  std::string errstr;

  // 配置
  ExampleEventCb event_cb;
  std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
  conf->set("bootstrap.servers", brokers, errstr);
  conf->set("event_cb", &event_cb, errstr);

  // 创建 producer
  std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));
  if (!producer) {
    std::cerr << "Failed to create producer: " << errstr << std::endl;
    return 1;
  }

  // 创建 Topic
  std::unique_ptr<RdKafka::Topic> topic(RdKafka::Topic::create(producer.get(), topic_str, nullptr, errstr));
  if (!topic) {
    std::cerr << "Failed to create topic: " << errstr << std::endl;
    return 1;
  }

  std::string message = "Hello from C++ Kafka Producer!";
  RdKafka::ErrorCode resp = producer->produce(
    topic.get(),                            // topic ptr
    RdKafka::Topic::PARTITION_UA,           // partition
    RdKafka::Producer::RK_MSG_COPY,         // message flags
    const_cast<char *>(message.c_str()),    // payload
    message.size(),                         // payload size
    nullptr,                                // optional key
    nullptr);                               // opaque

  if (resp != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
  } else {
    std::cout << "Message sent successfully\n";
  }

  producer->flush(3000);
  return 0;
}


}

2. 消费者示例(Consumer.cpp)

#include <librdkafka/rdkafkacpp.h>
#include <iostream>
#include <csignal>
#include <memory>

bool running = true;

void signal_handler(int) {
  running = false;
}

class ExampleEventCb : public RdKafka::EventCb {
  void event_cb(RdKafka::Event &event) override {
    if (event.type() == RdKafka::Event::EVENT_ERROR) {
      std::cerr << "Kafka Error: " << event.str() << std::endl;
    }
  }
};

int main() {
  signal(SIGINT, signal_handler);

  std::string brokers = "kafka:9092";
  std::string topic = "test_topic";
  std::string group_id = "cpp_consumer_group";

  std::string errstr;
  ExampleEventCb event_cb;

  auto conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  conf->set("bootstrap.servers", brokers, errstr);
  conf->set("group.id", group_id, errstr);
  conf->set("auto.offset.reset", "earliest", errstr);
  conf->set("event_cb", &event_cb, errstr);

  auto consumer = RdKafka::KafkaConsumer::create(conf, errstr);
  if (!consumer) {
    std::cerr << "Failed to create consumer: " << errstr << std::endl;
    return 1;
  }

  consumer->subscribe({topic});
  std::cout << "Consuming messages from topic " << topic << std::endl;

  while (running) {
    auto msg = consumer->consume(1000);
    if (msg->err() == RdKafka::ERR_NO_ERROR) {
      std::string message(reinterpret_cast<const char*>(msg->payload()), msg->len());
		std::cout << "Received message: " << message << std::endl;
    }
    }
    delete msg;
  }

  consumer->close();
  delete consumer;
  return 0;
}



五、Kafka 开发相关 C++ 能力

  • 熟练使用 RAII、智能指针、异常处理

  • 理解线程安全、异步模型(poll, callback)

  • 能够结合 JSON/XML 配置 Kafka 客户端

  • 编写模块化、高性能的消息收发组件

  • 构建系统:CMake

  • 日志:spdlog 或 glog

  • 单元测试:gtest

  • JSON:nlohmann/json


六、推荐资料与开源项目

  • 📚 Kafka 权威指南(原书第2版)

  • 📘 librdkafka 文档

  • 📖 Apache Kafka 官方文档

  • 💻 开源项目参考:

    • confluent-kafka-cpp
    • cppkafka(封装更现代 C++)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2386057.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简单血条于小怪攻击模板

创建一个2d正方形&#xff08;9-Sliced&#xff09;命名为Player&#xff0c;在Player下面新建一个画布&#xff08;Canvas&#xff09;命名为PlayerHealthUI&#xff0c;在画布下面新建一个滑动条&#xff08;Slider&#xff09;命名为HealthBar 把PlayerHealthUI脚本挂载到Pl…

代码随想录算法训练营第四十六四十七天

卡码网题目: 110. 字符串接龙105. 有向图的完全联通106. 岛屿的周长107. 寻找存在的路径 其他: 今日总结 往期打卡 110. 字符串接龙 跳转: 110. 字符串接龙 学习: 代码随想录公开讲解 问题: 字典 strList 中从字符串 beginStr 和 endStr 的转换序列是一个按下述规格形成的序…

华硕FL8000U加装16G+32G=48G内存条

华硕FL8000U加装16G32G48G内存条 一、华硕FL8000U加装内存条endl 一、华硕FL8000U加装内存条 相关视频链接: https://www.bilibili.com/video/BV1gw4dePED8/ endl

勇闯Chromium—— Chromium的多进程架构

问题 构建一个永不崩溃或挂起的渲染引擎几乎是不可能的,构建一个绝对安全的渲染引擎也几乎是不可能的。 从某种程度上来说,2006 年左右的网络浏览器状态与过去单用户、协作式多任务操作系统的状况类似。正如在这样的操作系统中,一个行为不端的应用程序可能导致整个系统崩溃…

软件质量保证与测试实验

课程  软件质量保证与测试 目的&#xff1a;练习软件测试中白盒测试方法 内容&#xff1a; 测试如下程序段&#xff1a; #include <stdio.h>int main() {int i 1, n1 0, n2 0;float sum 0.0;float average;float score[100];printf("请输入分…

历年华东师范大学保研上机真题

2025华东师范大学保研上机真题 2024华东师范大学保研上机真题 2023华东师范大学保研上机真题 在线测评链接&#xff1a;https://pgcode.cn/school?classification1 简单一位数代数式计算 题目描述 给一个小学生都会算的1位数与1位数运算的代数式&#xff0c;请你求出这个表…

在机器学习中,L2正则化为什么能够缓过拟合?为何正则化等机制能够使一个“过度拟合训练集”的模型展现出更优的泛化性能?正则化

在现代机器学习的发展历程中&#xff0c;过拟合&#xff08;Overfitting&#xff09;始终是亟需克服的重要挑战。其表现如同在训练数据上构建过度复杂的映射函数&#xff0c;虽能实现近乎完美的拟合&#xff0c;但其泛化能力却显著受限&#xff0c;导致模型在测试集或实际应用中…

k8s部署ELK补充篇:kubernetes-event-exporter收集Kubernetes集群中的事件

k8s部署ELK补充篇&#xff1a;kubernetes-event-exporter收集Kubernetes集群中的事件 文章目录 k8s部署ELK补充篇&#xff1a;kubernetes-event-exporter收集Kubernetes集群中的事件一、kubernetes-event-exporter简介二、kubernetes-event-exporter实战部署1. 创建Namespace&a…

C++性能相关的部分内容

C性能相关的部分内容 与底层硬件紧密结合 大端存储和小端存储&#xff08;硬件概念&#xff09; C在不同硬件上运行的结果可能不同 比如&#xff1a;输入01234567&#xff0c;对于大端存储的硬件会先在较大地址上先进行存储&#xff0c;而对于小端存储的硬件会先在较小地址上…

AI进行提问、改写、生图、联网搜索资料,嘎嘎方便!

极客侧边栏-AI板块 目前插件内已接入DeepSeek-R1满血版、Qwen3满血版 、豆包/智谱最新发布的推理模型以及各种顶尖AI大模型&#xff0c;并且目前全都可以免费不限次数使用&#xff0c;秒回不卡顿&#xff0c;联网效果超好&#xff01; 相比于市面上很多AI产品&#xff0c;极客…

GStreamer开发笔记(四):ubuntu搭建GStreamer基础开发环境以及基础Demo

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://blog.csdn.net/qq21497936/article/details/147714800 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、O…

2021年认证杯SPSSPRO杯数学建模A题(第二阶段)医学图像的配准全过程文档及程序

2021年认证杯SPSSPRO杯数学建模 A题 医学图像的配准 原题再现&#xff1a; 图像的配准是图像处理领域中的一个典型问题和技术难点&#xff0c;其目的在于比较或融合同一对象在不同条件下获取的图像。例如为了更好地综合多种信息来辨识不同组织或病变&#xff0c;医生可能使用…

CV中常用Backbone-3:Clip/SAM原理以及代码操作

前面已经介绍了简单的视觉编码器&#xff0c;这里主要介绍多模态中使用比较多的两种backbone&#xff1a;1、Clip&#xff1b;2、SAM。对于这两个backbone简单介绍基本原理&#xff0c;主要是讨论使用这个backbone。 1、CV中常用Backbone-2&#xff1a;ConvNeXt模型详解 2、CV中…

RPC 协议详解、案例分析与应用场景

一、RPC 协议原理详解 RPC 协议的核心目标是让开发者像调用本地函数一样调用远程服务&#xff0c;其实现过程涉及多个关键组件与流程。 &#xff08;一&#xff09;核心组件 客户端&#xff08;Client&#xff09;&#xff1a;发起远程过程调用的一方&#xff0c;它并不关心调…

dify-plugin-daemon的.env配置文件

源码位置&#xff1a;dify-plugin-daemon\.env 本文使用dify-plugin-daemon v0.1.0版本&#xff0c;主要总结了dify-plugin-daemon\.env配置文件。为了本地调试方便&#xff0c;采用本地运行时环境WSL2Ubuntu22.04方式运行dify-plugin-daemon服务。 一.服务器基本配置 服务器…

(九)PMSM驱动控制学习---无感控制之高阶滑膜观测器

在之前的文章中&#xff0c;我们介绍了永磁同步电机无感控制中的滑模观测器&#xff0c;但是同时我们也认识到了他的缺点&#xff1a;因符号函数带来的高频切换分量&#xff0c;使用低通滤波器引发相位延迟&#xff1b;在本篇文章&#xff0c;我们将会介绍高阶滑模观测器的无感…

Devicenet主转Profinet网关助力改造焊接机器人系统智能升级

某汽车零部件焊接车间原有6台焊接机器人&#xff08;采用Devicenet协议&#xff09;需与新增的西门子S7-1200 PLC&#xff08;Profinet协议&#xff09;组网。若更换所有机器人控制器或上位机系统&#xff0c;成本过高且停产周期长。 《解决方案》 工程师选择稳联技术转换网关…

《STL--list的使用及其底层实现》

引言&#xff1a; 上次我们学习了容器vector的使用及其底层实现&#xff0c;今天我们再来学习一个容器list&#xff0c; 这里的list可以参考我们之前实现的单链表&#xff0c;但是这里的list是双向循环带头链表&#xff0c;下面我们就开始list的学习了。 一&#xff1a;list的…

python的pip怎么配置的国内镜像

以下是配置pip国内镜像源的详细方法&#xff1a; 常用国内镜像源列表 清华大学&#xff1a;https://pypi.tuna.tsinghua.edu.cn/simple阿里云&#xff1a;https://mirrors.aliyun.com/pypi/simple中科大&#xff1a;https://pypi.mirrors.ustc.edu.cn/simple华为云&#xff1…

PCB 通孔是电容性的,但不一定是电容器

哼&#xff1f;……这是什么意思&#xff1f;…… 多年来&#xff0c;流行的观点是 PCB 通孔本质上是电容性的&#xff0c;因此可以用集总电容器进行建模。虽然当信号的上升时间大于或等于过孔不连续性延迟的 3 倍时&#xff0c;这可能是正确的&#xff0c;但我将向您展示为什…