Flink CDC将MySQL数据同步到数据湖

news2025/6/4 14:57:10

此项目可以理解为MySQL数据迁移,由Flink Stream监听MySQL的Binlog日志写入Kafka,在Kafka消费端将消息写入Doris或其他外部对象存储。
涉及的环境与版本

组件版本
flink1.20.1
flink-cdc3.4.0
kafka2.13-4.0.0
Dragonwell17

引入相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>etl</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.20.1</flink.version>
        <flink-cdc.version>3.4.0</flink-cdc.version>
        <kafka-clients.version>3.3.1</kafka-clients.version>
        <fastjson.version>1.2.83</fastjson.version>
        <aliyun-sdk-oss.version>3.18.2</aliyun-sdk-oss.version>
        <lombok.version>1.18.30</lombok.version>
        <hadoop.version>3.3.6</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka-clients.version}</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.oss</groupId>
            <artifactId>aliyun-sdk-oss</artifactId>
            <version>${aliyun-sdk-oss.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.13.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>1.12.3</version>
        </dependency>
    </dependencies>
</project>

主程序入口,flinck cdc监听mysql binlog

package org.example;

import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class ConditionalEventSync {
    public static void main(String[] args) throws Exception {
    	// 配置源mysql连接信息
        MySqlSource<String> source =
            MySqlSource.<String>builder().hostname("xxx").port(3306)
                .databaseList("xx").tableList("xx").username("xx")
                .password("xx").deserializer(new JsonDebeziumDeserializationSchema())
                    // 优化项
                    .splitSize(50)         // 表快照分片数(默认30)
                    .fetchSize(1024)       // 每次fetch行数(默认1024)
                    .connectTimeout(Duration.ofSeconds(30))
                    .connectionPoolSize(5) // 连接池大小(默认3)
                    .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").flatMap(new EventFilterFunction())
        // 将监听到的数据写入kafka
            .addSink(new KafkaSink("event_tracking"));
        env.setParallelism(4);
        env.enableCheckpointing(10000);
        // 避免因大数据量写入状态后端(如 RocksDB)导致 Checkpoint 超时
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // todo 大表快照可能导致频繁 Full GC,启动参数增大堆内存
        env.execute("Conditional  Event Tracking Sync");
    }

    public static class EventFilterFunction implements FlatMapFunction<String, String> {

        @Override
        public void flatMap(String json, Collector<String> out) {
            // JSONObject event = JSONObject.parseObject(json);
            // // 条件1:只同步特定类型
            // if (event.getIntValue("type") == 2) return;
            // // 条件2:过滤测试IP段
            // if (event.getString("ip").startsWith("192.168.")) return;
            out.collect(json);
        }
    }
}

将监听到的binlog日志写入kafka
kafka需要先创建对应的topic,UI客户端可以使用https://github.com/obsidiandynamics/kafdrop

package org.example;
import java.util.Properties;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaSink implements SinkFunction<String> {

    private transient KafkaProducer<String, String> producer;
    private final String topic;

    public KafkaSink(String topic) {
        this.topic = topic;
    }

    @Override
    public void invoke(String value, Context context) {
        if (producer == null) {
            producer = createKafkaProducer();
        }
        System.out.println("【KafkaSink】Sending event to Kafka.topic: "+topic+",body:" + value);
        producer.send(new ProducerRecord<>(topic, value));
    }

    private KafkaProducer<String, String> createKafkaProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092"); // Kafka broker 地址
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<>(props);
    }

    public void close() throws Exception {
        if (producer != null) {
            producer.close();
        }
    }
}

监听到的binlog数据如下,根据op字段判断监听到的数据变更是新增、更新还是删除,消费端需要区分做对应的处理。

{
	"before": null,
	"after": {
		"id": 3,
		"type": 1,
		"tag": "pay_enter",
		"user_id": 23,
		"ip": null,
		"client": null,
		"create_time": 1744045915000
	},
	"source": {
		"version": "1.9.8.Final",
		"connector": "mysql",
		"name": "mysql_binlog_source",
		"ts_ms": 0,
		"snapshot": "false",
		"db": "linda_source",
		"sequence": null,
		"table": "event_tracking",
		"server_id": 0,
		"gtid": null,
		"file": "",
		"pos": 0,
		"row": 0,
		"thread": null,
		"query": null
	},
	"op": "r",
	"ts_ms": 1745309434361,
	"transaction": null
}

kafka消费端可以单独起个项目部署在其他服务器

package org.example;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;

public class KafkaConsumer {

    public static void main(String[] args) {
        // ===================kafka消费==================
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092"); // Kafka broker 地址
        props.put("group.id", "test-group"); // 消费者组 ID
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer =
            new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("tool_event_tracking")); // 订阅 topic

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf(
                        "【KafkaConsumer】Received message: topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    handleMqEvent(record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }

    public static void handleMqEvent(String event) {
        System.out.println("handleMqEvent接收内容:" + event);
        JSONObject value = JSONObject.parseObject(event);
        String op = value.getString("op");// u:更新,r:新增,d:删除
        JSONObject before = value.getJSONObject("before");
        JSONObject after = value.getJSONObject("after");
        String userId = null;
        String path = null;
        switch (op) {
            case "c":
                // 新增
                saveToDoris(Lists.newArrayList(after.toJavaObject(EventTrackingEntity.class)));
                break;
            case "d":
                userId = before.getString("user_id");
                // 删除
                // todo
                break;
          case "u":
                userId = after.getString("user_id");
                // 更新
                // todo
                break;
        }
    }

    public static String saveToDoris(List<EventTrackingEntity> dataList) {
        String jdbcUrl = "jdbc:mysql://172.20.89.65:9030/devops";
        String username = "root";
        String password = "";

        String insertSQL =
            "INSERT INTO event_tracking (id, type, tag, user_id, ip, client, create_time) VALUES (?, ?, ?, ?, ?, ?, ?)";

        try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
            PreparedStatement ps = conn.prepareStatement(insertSQL)) {

            // 设置自动提交为 false,提高性能
            conn.setAutoCommit(false);

            for (EventTrackingEntity item : dataList) {
                ps.setLong(1, item.getId() != null ? item.getId() : 0);
                ps.setInt(2, item.getType() != null ? item.getType() : 0);
                ps.setString(3, item.getTag());
                ps.setLong(4, item.getUserId() != null ? item.getUserId() : 0);
                ps.setString(5, item.getIp());
                ps.setString(6, item.getClient());
                ps.setLong(7, item.getCreateTime().toEpochSecond(ZoneOffset.UTC));

                ps.addBatch();
            }

            int[] result = ps.executeBatch();
            conn.commit();

            System.out.println("批量插入完成,影响记录数:" + result.length);
            return "Success";

        } catch (SQLException e) {
            throw new RuntimeException("JDBC 写入 Doris 出错", e);
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2397113.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Mathematica观察多形式根的分布随参数的变化

有两种方式观察多项式的根随着参数变化&#xff1a;&#xff08;1&#xff09;直接制作一个小的动态视频&#xff1b;&#xff08;2&#xff09;绘制所有根形成的痕迹&#xff08;locus&#xff09;。 制作动态视频&#xff1a; (*Arg-plane plotting routine with plotting …

【C++高级主题】转换与多个基类

目录 一、多重继承的虚函数表结构&#xff1a;每个基类一个虚表 1.1 单继承与多重继承的虚表差异 1.2 代码示例&#xff1a;多重继承的虚函数覆盖 1.3 虚表结构示意图 二、指针与引用的类型转换&#xff1a;地址调整的底层逻辑 2.1 派生类指针转基类指针的地址偏移 2.2 …

『uniapp』添加桌面长按快捷操作 shortcuts(详细图文注释)

目录 手机环境适配说明安卓效果图代码 iOS(暂未实测,没有水果开发者)总结 欢迎关注 『uniapp』 专栏&#xff0c;持续更新中 欢迎关注 『uniapp』 专栏&#xff0c;持续更新中 手机环境适配说明 个别手机系统可能需要进行特别的权限设置,否则会无法使用 桌面快捷方式: 已知的有…

【LLM vs Agent】从语言模型到智能体,人工智能迈出的关键一步

目录 一、什么是 LLM&#xff1f;语言的天才&#xff0c;思维的起点 ✅ 特点小结&#xff1a; 二、什么是 Agent&#xff1f;智能的执行者&#xff0c;自主的决策者 ✅ 特点小结&#xff1a; 三、LLM 与 Agent 的关系&#xff1a;是工具&#xff0c;更是大脑 四、案例实战…

麦克风和电脑内播放声音实时识别转文字软件FunASR整合包V5下载

我基于FunASR制作的实时语音识别转文字软件当前更新到V5版本。软件可以实时识别麦克风声音和电脑内播放声音转为文字。 FunASR软件介绍 FunASR 是一款基础语音识别工具包和开源 SOTA 预训练模型&#xff0c;支持语音识别、语音活动检测、文本后处理等。 我使用FunASR制作了一…

PyTorch——卷积层(3)

conv_arithmetic/README.md at master vdumoulin/conv_arithmetic GitHub out_channel1 out_channel2

从 PyTorch 到 TensorFlow Lite:模型训练与推理

一、方案介绍 研发阶段&#xff1a;利用 PyTorch 的动态图特性进行快速原型验证&#xff0c;快速迭代模型设计。 灵活性与易用性&#xff1a;PyTorch 是一个非常灵活且易于使用的深度学习框架&#xff0c;特别适合研究和实验。其动态计算图特性使得模型的构建和调试变得更加直…

【存储基础】存储设备和服务器的关系和区别

文章目录 1. 存储设备和服务器的区别2. 客户端访问数据路径场景1&#xff1a;经过服务器处理场景2&#xff1a;客户端直连 3. 服务器作为"中转站"的作用 刚开始接触存储的时候&#xff0c;以为数据都是存放在服务器上的&#xff0c;服务器和存储设备是一个东西&#…

5.29打卡

浙大疏锦行 DAY 38 Dataset和Dataloader类 知识点回顾&#xff1a; 1. Dataset类的__getitem__和__len__方法&#xff08;本质是python的特殊方法&#xff09; 2. Dataloader类 3. minist手写数据集的了解 作业&#xff1a;了解下cifar数据集&#xff0c;尝试获取其中一张图…

【黑马程序员uniapp】项目配置、请求函数封装

黑马程序员前端项目uniapp小兔鲜儿微信小程序项目视频教程&#xff0c;基于Vue3TsPiniauni-app的最新组合技术栈开发的电商业务全流程_哔哩哔哩_bilibili 参考 有代码&#xff0c;还有app、h5页面、小程序的演示 小兔鲜儿-vue3ts-uniapp-一套代码多端部署: 小兔鲜儿-vue3ts-un…

PyTorch——DataLoader的使用

batch_size, drop_last 的用法 shuffle shuffleTrue 各批次训练的图像不一样 shuffleFalse 在第156step顺序一致

Predixy的docker化

概述 当前已有一套redis cluster的集群&#xff0c;但是fs中的hiredis只能配置单实例redis。 AI了一下方案&#xff0c;可以使用redis的proxy组件来实现从hiredis到redis cluster的互通。 代码地址&#xff1a;https://github.com/joyieldInc/predixy Predixy特性介绍&…

C++ 之 多态 【虚函数表、多态的原理、动态绑定与静态绑定】

目录 前言 1.多态的原理 1.1虚函数表 1.2派生类中的虚表 1.3虚函数、虚表存放位置 1.4多态的原理 1.5多态条件的思考 2.动态绑定与静态绑定 3.单继承和虚继承中的虚函数表 3.1单继承中的虚函数表 3.2多继承(非菱形继承)中的虚函数表 4.问答题 前言 需要声明的&#x…

【JavaWeb】Maven、Servlet、cookie/session

目录 5. Maven6. Servlet6.1 Servlet 简介6.2 HelloServlet6.3 Servlet原理6.4 Mapping( **<font style"color:rgb(44, 44, 54);">映射 ** )问题6.5 ServletContext6.6 HttpServletResponse<font style"color:rgb(232, 62, 140);background-color:rgb(…

Rust 编程实现猜数字游戏

文章目录 编程实现猜数字游戏游戏规则创建新项目默认代码处理用户输入代码解析 生成随机数添加依赖生成逻辑 比较猜测值与目标值类型转换 循环与错误处理优化添加循环优雅处理非法输入​ 最终完整代码核心概念总结 编程实现猜数字游戏 我们使用cargo和rust实现一个经典编程练习…

关于神经网络中的激活函数

这篇博客主要介绍一下神经网络中的激活函数以及为什么要存在激活函数。 首先&#xff0c;我先做一个简单的类比&#xff1a;激活函数的作用就像给神经网络里的 “数字信号” 加了一个 “智能阀门”&#xff0c;让机器能学会像人类一样思考复杂问题。 没有激活i函数的神经网络…

CentOS_7.9 2U物理服务器上部署系统简易操作步骤

近期单位网站革新&#xff0c;鉴于安全加固&#xff0c;计划将原有Windows环境更新到Linux-CentOS 7.9&#xff0c;这版本也没的说&#xff08;绝&#xff09;了&#xff08;版&#xff09;官方停止更新&#xff0c;但无论如何还是被sisi的牵挂着这一大批人&#xff0c;毕竟从接…

短视频平台差异视角下开源AI智能名片链动2+1模式S2B2C商城小程序的适配性研究——以抖音与快手为例

摘要 本文以抖音与快手两大短视频平台为研究对象&#xff0c;从用户群体、内容生态、推荐逻辑三维度分析其差异化特征&#xff0c;并探讨开源AI智能名片链动21模式与S2B2C商城小程序在平台适配中的创新价值。研究发现&#xff0c;抖音的流量中心化机制与优质内容导向适合品牌化…

【笔记】Windows 下载并安装 ChromeDriver

以下是 在 Windows 上下载并安装 ChromeDriver 的笔记&#xff1a; ✅ Windows 下载并安装 ChromeDriver 1️⃣ 确认 Chrome 浏览器版本 打开 Chrome 浏览器 点击右上角 ︙ → 帮助 → 关于 Google Chrome 记下版本号&#xff0c;例如&#xff1a;114.0.5735.199 2️⃣ 下载…

Spark-Core Project

RDD转换算子总结 RDD转换算子分为Value类型、双Value类型和Key - Value类型。 1、Value类型 map&#xff1a;对数据逐条映射转换&#xff0c;可改变数据类型或值。如 dataRDD.map(num > num * 2 运行结果&#xff1a; 2&#xff09;mapPartitions&#xff1a;以分区为单位处…