docker-compose配置文件
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.0.1
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-manager:
image: hlebalbau/kafka-manager:stable
container_name: kafka-manager
depends_on:
- zookeeper
ports:
- "9002:9000"
environment:
ZK_HOSTS: "zookeeper:2181"
kAFKA_BROKERS: 192.168.1.85:9092
KAFKA_MANAGER_AUTH_ENABLED: "false"
application.properties文件
spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
生产者controller
import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaTestController {
private final KafkaProducerService kafkaProducerService;
public KafkaTestController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@GetMapping("/send")
public String sendMessageToKafka(
@RequestParam(value = "topic", defaultValue = "test-topic") String topic,
@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {
kafkaProducerService.sendMessage(topic, message);
return "消息已发送: " + message + " 到主题: " + topic;
}
}
生产者service
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 发送消息到指定主题
* @param topic 主题名称
* @param message 消息内容
*/
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message)
.whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("消息发送成功: " + message +
", 分区: " + result.getRecordMetadata().partition() +
", 偏移量: " + result.getRecordMetadata().offset());
} else {
System.err.println("消息发送失败: " + ex.getMessage());
}
});
}
}
消费者监听
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
// 监听指定的主题,groupId用于区分不同的消费者组
@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")
public void consumeMessage(ConsumerRecord<String, String> record) {
System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
// 这里可以添加你的业务逻辑处理
}
}
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yykj</groupId>
<artifactId>kafka_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka_demo</name>
<description>kafka_demo</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot Starter for Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
kafka的启动:
tiup cdc cli changefeed create --server=http://192.168.1.85:8300 --changefeed-id="kafka-debezium" --sink-uri="kafka://192.168.1.85:9092/test-tidbmessage?protocol=debezium&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" --config=config-cdc.yml
config-cdc.yml配置
force-replicate=true
[filter]
# 只同步 law 数据库下的三张表
rules = ['law.sys_dict', 'law.sys_user', 'law.sys_role']