Debezium嵌入式连接postgresql封装服务

news2025/5/12 22:35:47

文章目录

      • 1.项目结构:
      • 2.依赖:
      • 3.application.properties
      • 4.DebeziumConnectorConfig类
      • 5.TableEnum类
      • 6.TableHandler接口(表处理抽象)
      • 7.DefaultTableHandler默认实现类
      • 8.UserTableHandler处理类
      • 9.TableHandlerFactory工厂
      • 10.DebeziumListener 监听事件
      • 11.测试

环境:JDK8,Debezium1.94,postgresql12

1.项目结构:

在这里插入图片描述

2.依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.7.18</version>
    </parent>

    <groupId>com.linging</groupId>
    <artifactId>springboot-debezium-server</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <debezium.version>1.9.4.Final</debezium.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${debezium.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-reload4j</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-postgres</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.43</version>
        </dependency>

    </dependencies>

</project>

3.application.properties

# Debezium Configuration
#连接器基本信息
#指定 Debezium 连接器的名称,唯一标识,用于在 Kafka Connect 中区分不同的连接器。
debezium.name=my-postgres-connector
#指定连接器的类名,这里是连接postgresql
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector

#偏移量相关
#指定偏移量存储的实现类,这里使用的是文件存储,将偏移量存储在本地文件中。
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
#处理数据的偏移量存储路径
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
#处理数据的偏移量提交时间间隔,单位毫秒,设置为 0 表示每次处理完一批记录后立即提交偏移量,这可以减少数据丢失的风险,但会增加系统开销
#以上一次提交时间开始计算
debezium.offset.flush.interval.ms=10000

#数据库连接信息
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
#捕获变更的数据库名称
debezium.database.dbname=db_test
#指定逻辑服务器的唯一标识,用于区分不同的数据库实例
debezium.database.server.id=postgresql_0
#指定逻辑服务器的名称,用于在 Kafka 主题中区分不同的数据库实例
debezium.database.server.name=customer_postgres_db_server

#数据库历史记录
#指定数据库模式记录的实现类,这里使用的是文件存储,将数据库历史记录存储在本地文件中
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
#指定数据库模式记录文件的路径
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat

#表和字段过滤
#debezium.table.include.list=public.user
#debezium.column.include.list=public.user.id,public.user.name

#其他配置
#指定是否自动创建 PostgreSQL 的逻辑复制槽,设置为 filtered 表示只捕获配置中指定的表和字段的变更
debezium.publication.autocreate.mode=filtered
#指定 PostgreSQL 的逻辑复制插件名称,pgoutput是 PostgreSQL 的逻辑复制插件名称,用于捕获变更
debezium.plugin.name=pgoutput
#指定逻辑复制槽的名称
debezium.slot.name=dbz_customerdb_listener
#不执行初始快照,直接捕获变更数据,取值:never、initial、when_needed
debezium.snapshot.mode=never
#批量提交条数
debezium.max.batch.size=100

logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG
logging.level.io.debezium=INFO

4.DebeziumConnectorConfig类

@Configuration
public class DebeziumConnectorConfig {

    @Bean
    public Properties customerConnector(Environment env) {
        Properties props = new Properties();
        props.setProperty("name", env.getProperty("debezium.name"));
        props.setProperty("connector.class", env.getProperty("debezium.connector.class"));
        props.setProperty("offset.storage", env.getProperty("debezium.offset.storage"));
        props.setProperty("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename"));
        props.setProperty("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms"));
        props.setProperty("database.hostname", env.getProperty("debezium.database.hostname"));
        props.setProperty("database.port", env.getProperty("debezium.database.port"));
        props.setProperty("database.user", env.getProperty("debezium.database.user"));
        props.setProperty("database.password", env.getProperty("debezium.database.password"));
        props.setProperty("database.dbname", env.getProperty("debezium.database.dbname"));
        props.setProperty("database.server.id", env.getProperty("debezium.database.server.id"));
        props.setProperty("database.server.name", env.getProperty("debezium.database.server.name"));
        props.setProperty("database.history", env.getProperty("debezium.database.history"));
        props.setProperty("database.history.file.filename", env.getProperty("debezium.database.history.file.filename"));
        props.setProperty("table.include.list", TableEnum.getTableNames()); //表名
        props.setProperty("column.include.list", TableEnum.getColumns()); // 表中得哪些字段
        props.setProperty("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode"));
        props.setProperty("plugin.name", env.getProperty("debezium.plugin.name"));
        props.setProperty("slot.name", env.getProperty("debezium.slot.name"));
        props.setProperty("snapshot.mode", env.getProperty("debezium.snapshot.mode"));
        props.setProperty("max.batch.size", env.getProperty("debezium.max.batch.size"));
        return props;
    }
}

5.TableEnum类

package com.linging.enums;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
 * 监听的表及字段配置
 * @author Linging
 * @version 1.0.0
 * @since 1.0
 */
public enum TableEnum {
    DEFAULT("default", "defaultTableHandler", null),
    USER("public.user", "userTableHandler", "public.user.id,public.user.name"),
    ;
    // 表名称
    private final String tableName;

    // 表处理类的名称
    private final String handlerName;

    // 表的字段名称,多个用逗号隔开
    public final String columnName;

    TableEnum(String tableName, String handlerName, String columnName) {
        this.tableName = tableName;
        this.handlerName = handlerName;
        this.columnName = columnName;
    }

    public String getTableName() {
        return tableName;
    }

    public String getHandlerName() {
        return handlerName;
    }

    public String getColumnName() {
        return columnName;
    }

    public static String getTableNames(){
        return Arrays.stream(TableEnum.values()).map(TableEnum::getTableName)
                .filter(name -> !"default".equals(name)).distinct().collect(Collectors.joining(","));
    }

    public static String getColumns(){
        return Arrays.stream(TableEnum.values())
                .filter(e -> !"default".equals(e.getTableName()) && e.getColumnName() != null)
                .map(TableEnum::getColumnName).distinct().collect(Collectors.joining(","));
    }
}

6.TableHandler接口(表处理抽象)

public interface TableHandler {
    void handle(SourceRecord sourceRecord);
    void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer);
}

7.DefaultTableHandler默认实现类

/**
 * 默认处理类
 */
@Component("defaultTableHandler")
public class DefaultTableHandler implements TableHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultTableHandler.class);

    @Override
    public void handle(SourceRecord sourceRecord) {
        log.info("Handling default table: {}", sourceRecord.topic());
        log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
    }

    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                            DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {
        log.info("Handling batch default table: {}", recordChangeEvents.size());
    }
}

8.UserTableHandler处理类

/**
 * user表变更处理类
 */
@Component("userTableHandler")
public class UserTableHandler implements TableHandler {
    private static final Logger log = LoggerFactory.getLogger(UserTableHandler.class);

    @Override
    public void handle(SourceRecord sourceRecord) {
        log.info("Handling user table: {}", sourceRecord.topic());
        log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
        // 添加具体的处理逻辑
        Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
        if (sourceRecordChangeValue != null) {
            Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

            // 处理非读操作
            if(operation != Envelope.Operation.READ) {
                String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;

                Struct struct = (Struct) sourceRecordChangeValue.get(record);
                Map<String, Object> payload = struct.schema().fields().stream()
                        .map(Field::name)
                        .filter(fieldName -> struct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));

                // TODO 处理逻辑(保存数据库,发送MQ等操作,需要保证幂等)
                log.info("Updated Data: {} with Operation: {}", payload, operation.name());
            }
        }
    }

    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                            DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {
        for (RecordChangeEvent<SourceRecord> recordChangeEvent : recordChangeEvents) {
            try {
                SourceRecord sourceRecord = recordChangeEvent.record();

                // TODO 处理逻辑(保存数据库,发送MQ等操作,需要保证幂等)
                this.handle(sourceRecord);

                // 标记已处理
                committer.markProcessed(recordChangeEvent);
            } catch (InterruptedException e) {
                log.error("处理异常:", e);
            }
        }
    }
}

9.TableHandlerFactory工厂

@Component
public class TableHandlerFactory implements ApplicationContextAware {

    @Value("${debezium.database.server.name}")
    private String prefixServerName;

    private ApplicationContext context;

    private final Map<String, TableHandler> handlers = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    @PostConstruct
    public void init(){
        String name = null;
        for (TableEnum tableEnum : TableEnum.values()) {
            if(TableEnum.DEFAULT.equals(tableEnum)){
                name = tableEnum.getTableName();
            }else{
                name = getTableName(tableEnum.getTableName());
            }
            handlers.putIfAbsent(name,
                    context.getBean(tableEnum.getHandlerName(), TableHandler.class));
        }
    }

    public TableHandler getHandler(String tableName) {
        return handlers.getOrDefault(tableName, handlers.get(TableEnum.DEFAULT.getTableName()));
    }

    public String getTableName(String name){
        return prefixServerName + "." + name;
    }

}

10.DebeziumListener 监听事件

@Component
public class DebeziumListener {
    private static final Logger log = LoggerFactory.getLogger(DebeziumListener.class);
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
    private final TableHandlerFactory tableHandlerFactory;

    @Autowired
    public DebeziumListener(Properties customerConnector, TableHandlerFactory tableHandlerFactory) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(customerConnector)
                .using(OffsetCommitPolicy.periodic(customerConnector))
                .notifying(this::handleChangeEventBatch)
                .build();
        this.tableHandlerFactory = tableHandlerFactory;
    }

    /**
     * 批量记录处理
     * @param recordChangeEvents
     * @param committer
     */
    private void handleChangeEventBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,
                                        DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer){
        // 根据表分组
        Map<String, List<RecordChangeEvent<SourceRecord>>> tableName2List = recordChangeEvents
                .stream().collect(Collectors.groupingBy(event -> event.record().topic()));

        tableName2List.forEach((tableName, recordChangeEventList) -> {
            TableHandler handler = tableHandlerFactory.getHandler(tableName);
            handler.handleBatch(recordChangeEventList, committer);
        });
        try {
            // 触发提交策略
            committer.markBatchFinished();
        } catch (InterruptedException e) {
            log.error("提交异常:", e);
        }
    }

    /**
     * 单条记录处理
     * @param sourceRecordChangeEvent
     */
    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordChangeEvent.record();

        log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
        String tableName = sourceRecord.topic();

        // 获取对应的表处理类
        TableHandler handler = tableHandlerFactory.getHandler(tableName);
        handler.handle(sourceRecord);
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }

    @PreDestroy
    private void stop() {
        if (this.debeziumEngine != null) {
            try {
                this.debeziumEngine.close();
            } catch (IOException e) {
                log.error("关闭debeziumEngine异常:", e);
            }
        }
        this.executor.shutdown();
    }
}

11.测试

启动服务,修改数据库user表数据:
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2332129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python 爬取 1688.item_get_factory 接口:获取工厂档案信息实战指南

在电商采购和供应链管理中&#xff0c;了解供应商的工厂信息是至关重要的一步。1688 作为国内领先的 B2B 平台&#xff0c;提供了丰富的供应商和工厂档案信息。通过 item_get_factory API 接口&#xff0c;开发者可以获取工厂的详细信息&#xff0c;包括工厂名称、地址、联系方…

Rust所有权详解

文章目录 Rust所有权所有权规则作用域 内存和分配移动与克隆栈空间堆空间 关于函数的所有权机制作为参数作为返回值 引用与租借垂悬引用 Rust所有权 C/C中我们对于堆内存通常需要自己手动管理&#xff0c;手动申请和释放&#xff0c;即便有了智能指针&#xff0c;对于效率的影…

CExercise_07_1指针和数组_2数组元素的逆序数组逆序(指针版 reverse_by_ptr 和下标版 reverse_arr)

题目&#xff1a; 数组元素的逆序。要求使用[]运算符以及纯粹指针操作两种方式来完成。 关键点 arr[i] arr[len - 1 - i]; arr[0]arr[len-1]; 如果数组序列是偶数,则调换最中间一对为止;若为奇数,则单出一个不用反转. 思想就是长度取一半 eg:8/2, 9/24.5,反转一半,到5时固定…

框架PasteForm实际开发案例,换个口味显示数据,支持echarts,只需要标记几个特性即可在管理端显示(2)

PasteForm框架的主要思想就是对Dto进行标记特性,然后管理端的页面就会以不一样的UI呈现 使用PasteForm框架开发,让你免去开发管理端的烦恼,你只需要专注于业务端和用户端! 在管理端中,如果说表格是基本的显示方式,那么图表chart就是一个锦上添花的体现! 如果一个项目拥…

Starrocks的Bitmap索引和Bloom filter索引以及全局字典

写这个的主要作用是梳理一下Starrocks的索引效率以及使用场景。 Starrocks Bitmap索引 原理&#xff1a; Bitmap 索引是一种使用 bitmap 的特殊数据库索引。bitmap 即为一个 bit 数组&#xff0c;一个 bit 的取值有两种&#xff1a;0 或 1。 每一个 bit 对应数据表中的一行&…

QML面试笔记--UI设计篇05容器控件

1. QML中容器控件全解&#xff1a;构建灵活界面的基石 1.1. Item&#xff08;万物容器&#xff09;1.2. Rectangle&#xff08;视觉容器&#xff09;1.3. ListView&#xff08;动态列表容器&#xff09;1.4. Frame&#xff08;表单容器&#xff09;1.5. SwipeView&#xff08;页…

VSCode运行,各类操作缓慢,如何清理

VSCode写代码&#xff0c;随着项目逐步进展&#xff0c;代码量在增加&#xff0c;依赖的第三方头文件也在增加&#xff0c; 先是发现代码提示的速度变慢&#xff0c; 后来格式化代码速度太慢 然后c/c代码的语法检查有时候压根就失败&#xff0c;来个错误提示 还有source contro…

redis(2)-mysql-锁

1.数据倾斜&#xff1a; 解决&#xff1a;虚拟节点 2.缓存穿透&#xff1a;缓存雪崩、击穿 3.分布式锁 多把锁控制不同节点上的一致性问题。 锁是有失效时间的。 强制回收。 4.redis 和zookeeper的区别 redis 数据支持有效期 4.1 zookeeper 分布式一致性服务框架&am…

OpenLayers:海量图形渲染之矢量切片

最近由于在工作中涉及到了海量图形渲染的问题&#xff0c;因此我开始研究相关的解决方案。在咨询了许多朋友之后发现矢量切片似乎是行业内最常用的一种解决方案&#xff0c;于是我便开始研究它该如何使用。 一、什么是矢量切片 矢量切片按照我的理解就是用栅格切片的方式把矢…

AI智算-K8s+vLLM Ray:DeepSeek-r1 671B 满血版分布式推理部署实践

K8s + vLLM & Ray:DeepSeek-r1 671B 满血版分布式推理部署实践 前言环境准备1. 模型下载2. 软硬件环境介绍正式部署1. 模型切分2. 整体部署架构3. 安装 LeaderWorkerSet4. 通过 LWS 部署DeepSeek-r1模型5. 查看显存使用率6. 服务对外暴露7. 测试调用API7.1 通过 curl7.2 通…

深入浅出SPI通信协议与STM32实战应用(W25Q128驱动)(实战部分)

1. W25Q128简介 W25Q128 是Winbond推出的128M-bit&#xff08;16MB&#xff09;SPI接口Flash存储器&#xff0c;支持标准SPI、Dual-SPI和Quad-SPI模式。关键特性&#xff1a; 工作电压&#xff1a;2.7V~3.6V分页结构&#xff1a;256页/块&#xff0c;每块16KB&#xff0c;共1…

前端知识点---闭包(javascript)

文章目录 1.怎么理解闭包?2.闭包的特点3.闭包的作用?4 闭包注意事项&#xff1a;5 形象理解6 闭包的应用 1.怎么理解闭包? 函数里面包着另一个函数&#xff0c;并且内部函数可以访问外部函数的变量。 <script> function box() {//周围状态&#xff08;外部函数中定义的…

Java 泛型的逆变与协变:深入理解类型安全与灵活性

泛型是 Java 中强大的特性之一&#xff0c;它提供了类型安全的集合操作。然而&#xff0c;泛型的类型关系&#xff08;如逆变与协变&#xff09;常常让人感到困惑。 本文将深入探讨 Java 泛型中的逆变与协变&#xff0c;帮助你更好地理解其原理和应用场景。 一、什么是协变与…

多线程(进阶)(内涵面试题)

目录 一、常见的锁策略 1. 悲观锁 vs 乐观锁 2. 重量级锁 vs 轻量级锁 3. 挂起等待锁 vs 自旋锁 4. 普通互斥锁 vs 读写锁 5. 可重入锁 vs 不可重入锁 6. 公平锁 vs 非公平锁 7. 面试题 二、synchronized的原理 1. 基本特点 2. 加锁工作过程 1&#xff09;偏向锁&am…

蓝桥杯补题

方法技巧&#xff1a; 1.进行循环暴力骗分&#xff0c;然后每一层的初始进行判断&#xff0c;如果已经不满足题意了&#xff0c;那么久直接continue&#xff0c;后面的循环就不用浪费时间了。我们可以把题目所给的等式&#xff0c;比如说有四个未知量&#xff0c;那么我们可以用…

【MySQL篇】mysqlpump和mysqldump参数区别总汇

&#x1f4ab;《博主主页》&#xff1a;奈斯DB-CSDN博客 &#x1f525;《擅长领域》&#xff1a;擅长阿里云AnalyticDB for MySQL(分布式数据仓库)、Oracle、MySQL、Linux、prometheus监控&#xff1b;并对SQLserver、NoSQL(MongoDB)有了解 &#x1f496;如果觉得文章对你有所帮…

SQL:DDL(数据定义语言)和DML(数据操作语言)

目录 什么是SQL&#xff1f; 1. DDL&#xff08;Data Definition Language&#xff0c;数据定义语言&#xff09; 2. DML&#xff08;Data Manipulation Language&#xff0c;数据操作语言&#xff09; DDL和DML的区别 什么是SQL&#xff1f; SQL&#xff08;Structured …

神舟平板电脑怎么样?平板电脑能当电脑用吗?

在如今的数码产品市场上&#xff0c;神舟平板电脑会拥有独特的优势&#xff0c;其中比较受到大家关注的就是神舟PCpad为例&#xff0c;无论是设计还是规格也会有很多的亮点&#xff0c;那么是不是可以直接当成电脑一起来使用呢&#xff1f; 这款平板电脑就会配备10.1英寸显示屏…

【力扣hot100题】(075)数据流的中位数

一开始只建立了一个优先队列&#xff0c;每次查询中位数时都要遍历一遍于是喜提时间超限&#xff0c;看了答案才恍然大悟原来还有这么聪明的办法。 方法是建立两个优先队列&#xff0c;一个大根堆一个小根堆&#xff0c;大根堆记录较小的数&#xff0c;小根堆记录较大的数。 …

Java——pdf增加水印

文章目录 前言方式一 itextpdf项目依赖引入编写PDF添加水印工具类测试效果展示 方式二 pdfbox依赖引入编写实现类效果展示 扩展1、将inputstream流信息添加水印并导出zip2、部署出现找不到指定字体文件 资料参考 前言 近期为了知识库文件导出&#xff0c;文件数据安全处理&…