一、cppkafka简介
cppkafka是一个现代C++的Apache Kafka客户端库,它是对librdkafka的高级封装,旨在简化使用librdkafka的过程,同时保持最小的性能开销。
二、环境准备
2.1 系统要求
- librdkafka >= 0.9.4
- CMake >= 3.9.2
- C++11兼容的编译器(gcc >= 4.8)
- Boost库(用于boost::optional)
2.2 依赖安装(Ubuntu示例)
# 安装librdkafka
sudo apt-get install librdkafka-dev
# 安装CMake
sudo apt-get install cmake
# 安装Boost库
sudo apt-get install libboost-all-dev
# 验证g++版本
g++ --version
三、安装步骤
3.1 下载cppkafka
git clone https://github.com/mfontanini/cppkafka.git
cd cppkafka
3.2 编译安装
mkdir build
cd build
cmake -DRDKAFKA_ROOT=/path/to/librdkafka -DBOOST_ROOT=/path/to/boost ..
make
sudo make install
四、常见问题与解决方案
4.1 依赖库版本问题
问题描述:librdkafka版本过低导致编译或运行时错误
解决方案:
# 检查版本
pkg-config --modversion librdkafka
# 升级librdkafka
sudo apt-get install librdkafka-dev
4.2 CMake配置问题
问题描述:CMake找不到librdkafka路径
解决方案:
# 明确指定路径
cmake -DRDKAFKA_ROOT=/usr/local/ ..
4.3 编译器支持问题
问题描述:编译器不支持C++11
解决方案:
# 安装新版g++
sudo apt-get install g++-4.8
# 指定编译器
cmake -DCMAKE_CXX_COMPILER=/usr/bin/g++-4.8 ..
五、代码示例
5.1 生产者示例
#include <cppkafka/cppkafka.h>
using namespace std;
using namespace cppkafka;
int main() {
// 创建配置
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" }
};
// 创建生产者
Producer producer(config);
// 生产消息
string message = "Hello Kafka!";
producer.produce(MessageBuilder("my_topic")
.partition(0)
.payload(message));
// 刷新生产者
producer.flush();
return 0;
}
5.2 消费者示例
#include <cppkafka/cppkafka.h>
using namespace std;
using namespace cppkafka;
int main() {
// 创建配置
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" },
{ "group.id", "test_group" },
{ "auto.offset.reset", "latest" }
};
// 创建消费者
Consumer consumer(config);
consumer.subscribe({ "my_topic" });
while (true) {
// 消费消息
Message msg = consumer.poll();
if (msg) {
if (!msg.get_error()) {
cout << "Received: " << msg.get_payload() << endl;
}
}
}
return 0;
}
六、高级配置
6.1 处理大消息
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" },
{ "message.max.bytes", "10485760" }, // 生产者配置
{ "fetch.message.max.bytes", "40971520" } // 消费者配置
};
6.2 避免消费历史数据
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" },
{ "group.id", "unique_group_id" },
{ "enable.auto.commit", false },
{ "auto.offset.reset", "latest" }
};
七、安装流程图
八、总结
通过本指南,您应该能够:
- 正确安装cppkafka及其依赖
- 解决安装过程中的常见问题
- 编写基本的Kafka生产者和消费者代码
- 处理大消息和实时消费等高级场景
遇到问题时,建议查阅:
- cppkafka GitHub仓库
- librdkafka文档