centos7.9使用docker-compose安装kafka

news2025/5/29 6:59:03

docker-compose配置文件

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-manager:
    image: hlebalbau/kafka-manager:stable
    container_name: kafka-manager
    depends_on:
      - zookeeper
    ports:
      - "9002:9000"
    environment:
      ZK_HOSTS: "zookeeper:2181"
      kAFKA_BROKERS: 192.168.1.85:9092
      KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaTestController {

    private final KafkaProducerService kafkaProducerService;

    public KafkaTestController(KafkaProducerService kafkaProducerService) {
        this.kafkaProducerService = kafkaProducerService;
    }

    @GetMapping("/send")
    public String sendMessageToKafka(
            @RequestParam(value = "topic", defaultValue = "test-topic") String topic,
            @RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {

        kafkaProducerService.sendMessage(topic, message);
        return "消息已发送: " + message + " 到主题: " + topic;
    }
}

生产者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    /**
     * 发送消息到指定主题
     * @param topic 主题名称
     * @param message 消息内容
     */
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message)
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        System.out.println("消息发送成功: " + message +
                                ", 分区: " + result.getRecordMetadata().partition() +
                                ", 偏移量: " + result.getRecordMetadata().offset());
                    } else {
                        System.err.println("消息发送失败: " + ex.getMessage());
                    }
                });
    }
}

消费者监听

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    // 监听指定的主题,groupId用于区分不同的消费者组
    @KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")
    public void consumeMessage(ConsumerRecord<String, String> record) {
        System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",
                record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());

        // 这里可以添加你的业务逻辑处理
    }
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.5.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.yykj</groupId>
    <artifactId>kafka_demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka_demo</name>
    <description>kafka_demo</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- Spring Boot Starter for Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.3.6</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

kafka的启动:

tiup cdc cli changefeed create   --server=http://192.168.1.85:8300   --changefeed-id="kafka-debezium"   --sink-uri="kafka://192.168.1.85:9092/test-tidbmessage?protocol=debezium&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"   --config=config-cdc.yml

config-cdc.yml配置

force-replicate=true
[filter]
# 只同步 law 数据库下的三张表
rules = ['law.sys_dict', 'law.sys_user', 'law.sys_role']

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2387193.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ETL 工具与数据中台的关系与区别

ETL 工具和数据中台作为数据处理领域的关键概念&#xff0c;虽然存在一定的关联&#xff0c;但二者有着明显的区别。本文将深入剖析 ETL 工具与数据中台之不同。 一、ETL 工具概述 ETL 是数据仓库技术中的核心技术之一&#xff0c;其全称为 Extract&#xff08;抽取&#xff…

SQLMesh Typed Macros:让SQL宏更强大、更安全、更易维护

在SQL开发中&#xff0c;宏&#xff08;Macros&#xff09;是一种强大的工具&#xff0c;可以封装重复逻辑&#xff0c;提高代码复用性。然而&#xff0c;传统的SQL宏往往缺乏类型安全&#xff0c;容易导致运行时错误&#xff0c;且难以维护。SQLMesh 引入了 Typed Macros&…

Docker 使用镜像[SpringBoot之Docker实战系列] - 第537篇

历史文章&#xff08;文章累计530&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 《…

解锁MCP:AI大模型的万能工具箱

摘要&#xff1a;MCP&#xff08;Model Context Protocol&#xff0c;模型上下文协议&#xff09;是由Anthropic开源发布的一项技术&#xff0c;旨在作为AI大模型与外部数据和工具之间沟通的“通用语言”。它通过标准化协议&#xff0c;让大模型能够自动调用外部工具完成任务&a…

Error in beforeDestroy hook: “Error: [ElementForm]unpected width “

使用 element 的 form 时候报错&#xff1a; vue.runtime.esm.js:3065 Error: [ElementForm]unpected width at VueComponent.getLabelWidthIndex (element-ui.common.js:23268:1) at VueComponent.deregisterLabelWidth (element-ui.common.js:23281:1) at Vue…

私有知识库 Coco AI 实战(七):摄入本地 PDF 文件

是否有些本地文件要检索&#xff1f;没问题。我们先对 PDF 类的文件进行处理&#xff0c;其他的文件往后稍。 Coco Server Token 创建一个 token 备用。 PDF_Reader 直接写个 python 程序解析 PDF 内容&#xff0c;上传到 Coco Server 就行了。还记得以前都是直接写入 Coco …

【Unity3D】将自动生成的脚本包含到C#工程文件中

我们知道&#xff0c;在用C#开发中&#xff0c;通过vs编辑器新建的脚本&#xff0c;会自动包含到vs工程中&#xff0c;而通过外部创建&#xff0c;比如复制别的工程或代码创建的C#脚本不会包含到vs工程。 在我们的日常开发中&#xff0c;通常会自动创建C#脚本&#xff0c;特别…

【Python 深度学习】1D~3D iou计算

一维iou 二维 import numpy as npdef iou_1d(set_a, set_b):# 获得集合A和B的边界 x1, x2 set_ay1, y2 set_b# 计算交集的上下界low max(x1,y1)high - min(x2, y2)# 计算交集if high - low < 0:inter 0else:inter high - low# 计算并集union (x2 -x1) (y2 - y1) - in…

java23

1.美化界面 添加背景图片 所以我们添加背景图片要放在后面添加 添加图片边框 绝对路径&#xff1a; 相对(模块)路径&#xff1a; 第一个是绝对路径&#xff0c;第二个是相对路径&#xff0c;但是斜杠的方向不对 总结&#xff1a; 2.图片移动 先实现KeyListener接口&#xf…

LitCTF2025 WEB

星愿信箱 使用的是python&#xff0c;那么大概率是ssti注入 测试{{5*5}} 发现需要包含文字&#xff0c;那么添加文字 可以看到被waf过滤了&#xff0c;直接抓包查看参数上fenjing 可以看到这里是json格式&#xff0c;其实fenjing也是支持json格式的 https://github.com/Marv…

Linux 下VS Code 的使用

这里以创建helloworld 为例。 Step 0:准备工作&#xff1a; Install Visual Studio Code. Install the C extension for VS Code. You can install the C/C extension by searching for c in the Extensions view (CtrlShiftX). Step 1: 创建工作目录 helloworld&#xff0…

Qt 布局管理器的层级关系

1、HomeWidget.h头文件&#xff1a; #ifndef HOMEWIDGET_H #define HOMEWIDGET_H#include <QWidget> #include <QPushButton> #include <QVBoxLayout> #include <QHBoxLayout>class HomeWidget : public QWidget {Q_OBJECTpublic:HomeWidget(QWidget …

maven模块化开发

使用方法 将项目安装到本地仓库 mvn install 的作用 运行 mvn install 时&#xff0c;Maven 会执行项目的整个构建生命周期&#xff08;包括 compile、test、package 等阶段&#xff09;&#xff0c;最终将构建的 artifact 安装到本地仓库&#xff08;默认路径为 ~/.m2/repos…

云原生安全之网络IP协议:从基础到实践指南

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 IP协议&#xff08;Internet Protocol&#xff09;是互联网通信的核心协议族之一&#xff0c;负责在设备间传递数据包。其核心特性包括&…

C++——QT 文件操作类

QFile 概述 QFile是Qt框架中用于文件操作的类&#xff08;位于QtCore模块&#xff09;&#xff0c;继承自 QIODevice&#xff0c;提供文件的读写、状态查询和路径管理功能。它与 QTextStream、QDataStream 配合使用&#xff0c;可简化文本和二进制数据的处理&#xff0c;并具备…

[spring] spring 框架、IOC和AOP思想

目录 传统Javaweb开发的困惑 loC、DI和AOP思想提出 Spring框架的诞生 传统Javaweb开发的困惑 问题一&#xff1a;层与层之间紧密耦合在了一起&#xff0c;接口与具体实现紧密耦合在了一起 解决思路&#xff1a;程序代码中不要手动new对象&#xff0c;第三方根据要求为程序提…

尚硅谷redis7 37-39 redis持久化之AOF简介

37 redis持久化之AOF简介 AOF 以日志的形式来记录每个写操作,将Redis执行过的所有写指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工…

GitLab 备份所有仓库(自动克隆)

一、准备工作 1. 环境要求 已安装 Git&#xff08;版本 2.10&#xff09;本地磁盘空间充足&#xff08;根据仓库总大小预估&#xff09;已配置 SSH 密钥到 GitLab&#xff08;推荐方式&#xff09; 2. 获取 GitLab API 访问权限 登录 GitLab&#xff0c;点击右上角头像 → …

[浏览器]缓存策略机制详解

在做页面性能优化的时候&#xff0c;有一个点容易被忽略&#xff0c;那就是资源缓存优化。 浏览器里缓存策略分为强缓存&#xff0c;协商缓存以及不缓存&#xff0c;每个缓存策略都有其适用的优化场景。 下面为大家详解何为强缓存&#xff0c;协商缓存 先说结论强缓>协商&g…

OpenCV CUDA 模块图像过滤-----创建一个计算图像导数的滤波器函数createDerivFilter()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::cuda::createDerivFilter 是 OpenCV CUDA 模块中的一个工厂函数&#xff0c;用于创建一个计算图像导数的滤波器。这个滤波器可以用来计算图像…