【C/C++】记录一次麻烦的Kafka+Json体验

news2025/6/1 6:40:27

文章目录

  • 麻烦的Kafka+Json体验
    • 1 目标
    • 2 工程搭建
      • 2.1 docker配置
      • 2.2 代码
      • 2.3 工程压缩包
    • 3 执行结果

麻烦的Kafka+Json体验

1 目标

初心:结合kafka + json + docker,验证基本的数据生产/消费。

Kafka 配合 JSON 工具,主要是为了数据的序列化和反序列化,以及便于消息内容的格式化、解析和处理(让消息内容可读、结构化、标准化,方便发送、接收和处理)。

  1. 数据格式标准化
    Kafka 本身是一个消息队列系统,消息的内容可以是任意字节流。使用 JSON 作为消息格式,能够让消息结构清晰、规范,方便发送方和接收方统一约定消息格式。

  2. 跨语言和跨平台兼容
    JSON 是一种轻量级数据交换格式,几乎所有编程语言都支持 JSON 的解析和生成。这让 Kafka 发送的消息可以被不同语言和平台的消费者方便地处理。

  3. 方便调试和监控
    JSON 格式是文本格式,易于阅读和打印,方便开发和运维人员在调试、日志查看时快速理解消息内容。

  4. 灵活的消息结构
    JSON 支持嵌套结构、数组、键值对等灵活的数据组织方式,适合表达复杂的数据模型。

  5. 序列化/反序列化工具支持
    常见的 Kafka 客户端库通常提供 JSON 序列化器(Serializer)和反序列化器(Deserializer),让你可以方便地将业务对象转换成 JSON 字符串发送到 Kafka,或从 JSON 字符串转换回业务对象。

  6. 与 schema 注册中心配合
    虽然 JSON 本身没有强类型约束,但通过结合 JSON Schema 和 schema 注册中心(如 Confluent Schema Registry),可以保证数据格式的一致性和兼容性。


2 工程搭建

2.1 docker配置

docker-compose.yml

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  dev:
    build:
      context: .
      dockerfile: Dockerfile.dev  # ✅ 使用构建好的开发镜像
    container_name: cpp_dev
    tty: true
    stdin_open: true
    working_dir: /home/dev/code
    volumes:
      - ./cpp_kafka_code:/home/dev/code
    depends_on:
      - kafka

Dockerfile.dev

FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt-get install -y \
    build-essential \
    cmake \
    git \
    curl \
    wget \
    pkg-config \
    vim \
    librdkafka-dev \
    libssl-dev \
    libzstd-dev \
    libjsoncpp-dev \
    ca-certificates \
    && rm -rf /var/lib/apt/lists/*

# 安装 nlohmann/json(如未预装)
# RUN wget https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz && \
#    tar -xf json.tar.xz && \
#    cp -r single_include/nlohmann /usr/include/ && \
#    rm -rf json.tar.xz single_include
RUN apt-get update && apt-get install -y \
    nlohmann-json3-dev

# 创建开发用户
RUN useradd -ms /bin/bash dev
USER dev
WORKDIR /home/dev/code

2.2 代码

cpp_kafka_code/
├── CMakeLists.txt      
├── include/
│   ├── KafkaProducer.h
│   ├── KafkaConsumer.h
│   ├── ConfigManager.h
│   └── Message.h
├── src/
│   ├── KafkaProducer.cpp
│   ├── KafkaConsumer.cpp
│   ├── ConfigManager.cpp
│   └── Message.cpp
├── config/
│   └── kafka_config.json
├── test/
│   ├── producer_test.cpp
│   └── consumer_test.cpp

CMakeLists.txt

cmake_minimum_required(VERSION 3.14)
project(KafkaModule)

set(CMAKE_CXX_STANDARD 17)

set(ENV{PKG_CONFIG_PATH} "/usr/lib/x86_64-linux-gnu/pkgconfig")

# 查找依赖库
find_package(PkgConfig REQUIRED)
pkg_check_modules(RDKAFKA REQUIRED rdkafka)

# 添加头文件路径
include_directories(
    ${CMAKE_SOURCE_DIR}/include
	${RDKAFKA_INCLUDE_DIRS}
	/usr/include/librdkafka
)

link_directories(${RDKAFKA_LIBRARY_DIRS})

# 源码文件
file(GLOB SOURCES
    src/*.cpp
)

# 可执行测试文件
add_executable(producer_test test/producer_test.cpp ${SOURCES})
add_executable(consumer_test test/consumer_test.cpp ${SOURCES})

# 链接依赖库
# target_link_libraries(producer_test PkgConfig::RDKAFKA)
# target_link_libraries(consumer_test PkgConfig::RDKAFKA)
target_link_libraries(producer_test ${RDKAFKA_LIBRARIES})
target_link_libraries(consumer_test ${RDKAFKA_LIBRARIES})

config/kafka_config.json

{
  "bootstrap.servers": "kafka:9092",
  "group.id": "my-group",
  "auto.offset.reset": "earliest",
  "enable.auto.commit": "false"
}

include/ConfigManager.h

#pragma once
#include <string>
#include <nlohmann/json.hpp>

struct KafkaConfig {
    std::string brokers;
    std::string groupId;
    bool enableIdempotence = true;
    int batchSize = 100;
    int lingerMs = 5;
};

class ConfigManager {
public:
    static KafkaConfig loadConfig(const std::string& filename);
};

include/KafkaConsumer.h

#pragma once

#include <string>
#include <librdkafka/rdkafka.h>

class KafkaConsumer {
public:
    KafkaConsumer(const std::string& configFile, const std::string& topic);
    ~KafkaConsumer();

    bool poll(std::string& outMessage);

private:
    rd_kafka_t* rk_;
    rd_kafka_conf_t* conf_;
    rd_kafka_topic_partition_list_t* topics_;
};

include/KafkaProducer.h

#pragma once
#include <string>
#include <rdkafka.h>

class KafkaProducer {
public:
    explicit KafkaProducer(const std::string& configFile);
    bool send(const std::string& topic, const std::string& message);
    ~KafkaProducer();

private:
    rd_kafka_t* producer_ = nullptr;
    rd_kafka_conf_t* conf_ = nullptr;
};

include/Message.h

#pragma once
#include <string>
#include <nlohmann/json.hpp>

class Message {
public:
    Message() = default;
    explicit Message(const std::string& jsonStr);
    std::string toJson() const;
    void setField(const std::string& key, const std::string& value);
    std::string getField(const std::string& key) const;

private:
    nlohmann::json data_;
};

src/ConfigManager.cpp

#include "ConfigManager.h"
#include <fstream>

KafkaConfig ConfigManager::loadConfig(const std::string& filename) {
    std::ifstream in(filename);
    nlohmann::json j;
    in >> j;
    KafkaConfig cfg;
    cfg.brokers = j["bootstrap.servers"];
    cfg.groupId = j["group.id"];
    cfg.enableIdempotence = j.value("enable_idempotence", true);
    cfg.batchSize = j.value("batch_size", 100);
    cfg.lingerMs = j.value("linger_ms", 5);
    return cfg;
}

src/KafkaConsumer.cpp


#include "KafkaConsumer.h"
#include <fstream>
#include <iostream>
#include <nlohmann/json.hpp>

KafkaConsumer::KafkaConsumer(const std::string& configFile, const std::string& topic) {
    std::ifstream file(configFile);
    if (!file.is_open()) {
        std::cerr << "Failed to open config file: " << configFile << "\n";
        exit(1);
    }

    nlohmann::json configJson;
    file >> configJson;

    char errstr[512];
    conf_ = rd_kafka_conf_new();
    // for (auto& el : configJson.items()) {
    //     std::string valueStr = el.value().dump();

    //     if (rd_kafka_conf_set(conf_, el.key().c_str(), valueStr.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    //         std::cerr << "Config error: " << errstr << "\n";
    //     }
    // }

    for (auto& el : configJson.items()) {
        std::string valStr;
        if (el.value().is_string()) {
            valStr = el.value().get<std::string>();
        } else {
            valStr = el.value().dump();  // 数字或布尔等用dump转换成字符串
        }
        if (rd_kafka_conf_set(conf_, el.key().c_str(), valStr.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
            std::cerr << "Config error: " << errstr << "\n";
        }
    }

    rk_ = rd_kafka_new(RD_KAFKA_CONSUMER, conf_, errstr, sizeof(errstr));
    if (!rk_) {
        std::cerr << "Failed to create consumer: " << errstr << "\n";
        exit(1);
    }

    // Subscribe to topic
    topics_ = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics_, topic.c_str(), -1);
    rd_kafka_subscribe(rk_, topics_);
}

KafkaConsumer::~KafkaConsumer() {
    rd_kafka_consumer_close(rk_);
    rd_kafka_destroy(rk_);
    rd_kafka_topic_partition_list_destroy(topics_);
}

bool KafkaConsumer::poll(std::string& outMessage) {

    rd_kafka_message_t* msg = rd_kafka_consumer_poll(rk_, 1000);
    if (!msg) return false;

    if (msg->err) {
        std::cerr << "Consumer error: " << rd_kafka_message_errstr(msg) << "\n";
        rd_kafka_message_destroy(msg);
        return false;
    }

    std::cout << "Raw message hex: ";
    for (unsigned int i = 0; i < msg->len; ++i) {
        printf("%02X ", ((unsigned char*)msg->payload)[i]);
    }
    std::cout << std::endl;

    outMessage = std::string((char*)msg->payload, msg->len);

    // if (!outMessage.empty()) {
    //     std::cout << "Test outMessage: " << outMessage << std::endl;
    // }

    rd_kafka_message_destroy(msg);
    return true;
}


src/KafkaProducer.cpp

#include "KafkaProducer.h"
#include "ConfigManager.h"
#include <iostream>

KafkaProducer::KafkaProducer(const std::string& configFile) {
    KafkaConfig cfg = ConfigManager::loadConfig(configFile);
    conf_ = rd_kafka_conf_new();
    rd_kafka_conf_set(conf_, "bootstrap.servers", cfg.brokers.c_str(), nullptr, 0);
    if (cfg.enableIdempotence) {
        rd_kafka_conf_set(conf_, "enable.idempotence", "true", nullptr, 0);
    }
    producer_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, nullptr, 0);
}

bool KafkaProducer::send(const std::string& topic, const std::string& message) {

    for (unsigned char c : message) {
        printf("%02X ", c);
    }
    printf("\n");

    rd_kafka_resp_err_t err = rd_kafka_producev(
        producer_,
        RD_KAFKA_V_TOPIC(topic.c_str()),
        RD_KAFKA_V_VALUE(const_cast<char*>(message.c_str()), message.size()),
        RD_KAFKA_V_END);
    return err == RD_KAFKA_RESP_ERR_NO_ERROR;
}

KafkaProducer::~KafkaProducer() {
    rd_kafka_flush(producer_, 3000);
    rd_kafka_destroy(producer_);
}

src/Message.cpp

#include "Message.h"
#include <iostream>

Message::Message(const std::string& jsonStr) {
    data_ = nlohmann::json::parse(jsonStr);
}

std::string Message::toJson() const {
    if (data_.empty()) {
        std::cerr << "[Message::toJson] Warning: data_ is empty.\n";
    }
    return data_.dump();
}


void Message::setField(const std::string& key, const std::string& value) {
    data_[key] = value;
}

std::string Message::getField(const std::string& key) const {
    return data_.value(key, "");
}


test/consumer_test.cpp

#include "KafkaConsumer.h"
#include <iostream>

int main() {
    KafkaConsumer consumer("../config/kafka_config.json", "test_topic");
    std::cout << "Consumer started. Waiting for messages...\n";

    while (true) {
        std::string msg;
        if (consumer.poll(msg)) {
            std::cout << "Received: " << msg << std::endl;
        }
    }

    return 0;
}

test/producer_test.cpp

#include "KafkaProducer.h"
#include "Message.h"
#include <iostream>

int main() {
    Message msg;
    msg.setField("type", "test");
    msg.setField("payload", "hello");
    std::string jsonStr = msg.toJson();
    std::cout << "序列化结果: " << jsonStr << std::endl;

    Message parsed(jsonStr);
    std::cout << "解析 payload: " << parsed.getField("payload") << std::endl;

//    KafkaProducer producer("config/kafka_config.json");
    Message msg1;
    msg1.setField("type", "test");
    msg1.setField("payload", "hello from producer");
	std::cout << "Send message: " << msg1.toJson() << std::endl;

    KafkaProducer producer("../config/kafka_config.json");
	
	// if (producer.send("test_topic", msg1.toJson())) {
    //     std::cout << "Message sent successfully!\n";
    // } else {
    //     std::cout << "Message send failed.\n";
    // }


    std::string testStr = R"({"type":"test","payload":"hello from producer"})";
    producer.send("test_topic", testStr);

    return 0;
}


2.3 工程压缩包

3 执行结果

当前还存在一点bug,produce的数据,consumer接收时,前面总是出现乱码,应该时序列化/反序列化导致的,但是当前还未找到原因,
等我调试好后的好消息吧!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2392203.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系列-2 Shell常用命令收集

背景 本文用于收集Linux常用命令(基于Centos7)&#xff0c;是一个持续更新的博客&#xff0c;建议收藏&#xff0c;编写shell时遇到问题可以随时查阅。 1.Shell类型 shell是用C语言编写的程序&#xff0c;作为命令解释器连接着用户和操作系统内核。常见的shell有sh(Bourne She…

MATLAB使用多个扇形颜色变化表示空间一个点的多种数值

MATLAB使用多个扇形颜色变化表示空间一个点的多种数值 excel中表格中数据格式&#xff0c;多行 lonlatdata1data2data3117380.11100 clear;close all; figure(Position,[100 100 800 800]);num_points 14; [num,txt,raw] xlsread(test.xlsx); x num(:,1); y num(:,2);d…

CAD精简多段线顶点、优化、删除多余、重复顶点——CAD c#二次开发

附部分代码如下: public static void Pl精简(){Document doc Autodesk.AutoCAD.ApplicationServices.Application.DocumentManager.MdiActiveDocument;Database db doc.Database;Editor ed doc.Editor;var plOrigon db.SelectCurve("\n选择多段线&#xff1a;");…

输电线路的“智慧之眼”:全天候可视化监测如何赋能电网安全运维

在电力需求持续攀升、电网规模日益庞大的今天&#xff0c;输电线路的安全稳定运行面临着前所未有的挑战。线路跨越地形复杂多变&#xff0c;尤其是在偏远山区、铁路沿线及恶劣天气条件下&#xff0c;传统的人工巡检方式显得力不从心——效率低、风险高、覆盖有限。如何实现更智…

两阶段法目标检测发展脉络

模式识别期末展示大作业&#xff0c;做个记录&#xff0c;希望大家喜欢。 R-CNN Fast R-CNN R-FCN 整个过程可以分解为以下几个步骤&#xff1a; 输入图像 (image) 和初步特征提取 (conv, feature maps)&#xff1a; 首先&#xff0c;输入一张原始图像&#xff0c;经过一系列…

小白的进阶之路系列之六----人工智能从初步到精通pytorch数据集与数据加载器

本文将介绍以下内容: 数据集与数据加载器 数据迁移 如何建立神经网络 数据集与数据加载器 处理数据样本的代码可能会变得混乱且难以维护;理想情况下,我们希望我们的数据集代码与模型训练代码解耦,以获得更好的可读性和模块化。PyTorch提供了两个数据原语:torch.utils…

NestJS——重构日志、数据库、配置

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

c++数据结构8——二叉树的性质

一、二叉树的基本性质 示图1&#xff1a; 性质1&#xff1a;层节点数上限 在一棵二叉树中&#xff0c;第i层至多有2^{i-1}个节点&#xff08;首层是第1层&#xff09; 这个性质可以通过数学归纳法证明&#xff1a; 第1层&#xff1a;2^{1-1}2^01个节点&#xff08;根节点&am…

Window Server 2019--08 网络负载均衡与Web Farm

本章要点 1、了解网络负载均衡技术 2、掌握Web Farm核心原理 3、掌握如何使用Windows NLB搭建Web Farm环境 网络负载均衡技术将外部计算机发送的连接请求均匀的分配到服务器集群中的每台服务器上&#xff0c;接受到请求的服务器独立地响应客户的请求。 网络负载均衡技术还…

SpringBoot:统一功能处理、拦截器、适配器模式

文章目录 拦截器什么是拦截器&#xff1f;为什么要使用拦截器&#xff1f;拦截器的使用拦截路径执行流程典型应用场景DispatcherServlet源码分析 适配器模式适配器模式定义适配器模式角色适配器模式的实现适配器模式应用场景 统⼀数据返回格式优点 统一处理异常总结 拦截器 什…

AI Agent工具全景解析:从Coze到RAGflow,探索智能体自动化未来!

在人工智能技术持续深入行业应用的背景下&#xff0c;越来越多的企业和个人寻求通过自动化技术来提高效率和减少重复性劳动&#xff0c;AI Agent的崛起已经成为了不可忽视的趋势。AI Agent&#xff0c;即人工智能代理&#xff0c;是一种基于先进的人工智能技术&#xff0c;特别…

Onvif协议:IPC客户端开发-IPC相机控制(c语言版)

前言&#xff1a; 本博文主要是借鉴OceanStar大神的博文&#xff0c;在他的博文的基础之上做了一部分修改与简化。 博文链接&#xff1a; Onvif协议&#xff1a;IPC客户端开发之鉴权_onvif鉴权方式-CSDN博客 Onvif协议&#xff1a;IPC客户端开发之PTZ控制_onvif ptz-CSDN博客…

如何最简单、通俗地理解Pytorch?神经网络中的“梯度”是怎么自动求出来的?PyTorch的动态计算图是如何实现即时执行的?

PyTorch是一门科学——现代深度学习工程中的一把锋利利器。它的简洁、优雅、强大,正在让越来越多的AI研究者、开发者深度应用。 1. PyTorch到底是什么?为什么它重要? PyTorch是一个开源的深度学习框架,由Facebook AI Research(FAIR)于2016年发布,它的名字由两个部分组成…

QT+opecv如何更改图片的拍摄路径

如何更改相机拍摄图片的路径 前言&#xff1a;基础夯实&#xff1a;效果展示&#xff1a;实现功能&#xff1a;遇到问题&#xff1a;未解决&#xff1a; 核心代码&#xff1a; 前言&#xff1a; 最近在项目开发中遇到需要让用户更改相机拍摄路径的问题&#xff0c;用户可自己选…

秋招Day11 - JVM - 类加载机制

了解类的加载机制吗&#xff1f; JVM是运行Java字节码&#xff0c;也就是运行.class文件的虚拟机&#xff0c;JVM把.class文件中描述类的数据结构加载到内存中&#xff0c;并对数据进行校验&#xff0c;解析和初始化&#xff0c;最终转化为JVM可以使用的类型&#xff08;Klass…

Webug4.0靶场通关笔记03- 第3关SQL注入之时间盲注(手注法+脚本法 两种方法)

目录 一、源码分析 1.分析闭合 2.分析输出 &#xff08;1&#xff09;查询成功 &#xff08;2&#xff09;查询失败 &#xff08;3&#xff09;SQL语句执行报错 二、第03关 延时注入 1.打开靶场 2.SQL手注 &#xff08;1&#xff09;盲注分析 &#xff08;2&#xf…

Vert.x学习笔记-什么是Handler

Vert.x学习笔记 在Vert.x中&#xff0c;Handler是一个核心概念&#xff0c;用于处理异步事件和回调。它是Vert.x响应式编程模型的核心组件之一&#xff0c;通过函数式接口的方式简化了异步编程的复杂性。 1. Handler的定义 Handler是一个函数式接口&#xff0c;定义如下&#…

【Echarts】象形图

目录 效果代码 效果 代码 <!-- 业务类型 --> <template><div class"ywlx" :style"{ --height: height }"><div class"header_count count_linear_bg"><div>当月业务总量<span class"common_count text_s…

集星云推短视频矩阵系统的定制化与私有化部署方案

在当今数字化营销时代&#xff0c;短视频矩阵系统成为众多企业和机构拓展影响力、实现精准营销的关键工具。集星云推短视频矩阵系统凭借其强大的功能和灵活的定制性&#xff0c;为企业提供了全方位的解决方案。 一、API接口定制&#xff1a;无缝对接自有系统 集星云推短视频矩…

XCTF-web-file_include

解析 <?php highlight_file(__FILE__); // 高亮显示当前PHP文件源代码 include("./check.php"); // 包含检查文件&#xff08;可能包含安全过滤逻辑&#xff09;if(isset($_GET[filename])) { // 检查是否传入filename参数$filename $_GET[f…