09 flink-sql 中基于 mysql-cdc 的 select * from test_user 的具体实现

news2025/7/8 2:38:11

前言

这也是最近帮一个朋友看问题 遇到的一个问题 

然后 引发了一下 对于 flink-sql 里面的一些 常规处理的思考, 理解 

原始问题主要是 在测试库可以使用 flink-sql 可以正常同步, 但是 在生产环境 无法正常同步数据 

这个问题 我们后面单独 记录一篇文章 

87fe04f3239e4e768da72132e7774269.png

 

 

测试用例

下载 flink-1.13.6, 首先启动一个 standalone 的集群 

master:flink-1.13.6 jerry$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.

 

启动 flink sql-client 

master:flink-1.13.6 jerry$ ./bin/sql-client.sh 
Listening for transport dt_socket at address: 5007
No default environment specified.
Searching for '/Users/jerry/Downloads/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
Command history file path: /Users/jerry/.flink-sql-history

                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

 

创建 flink-sql 的表结构 

CREATE TABLE test_user (
`name` string,
`age` string,
PRIMARY KEY (`name`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'postgres',
'database-name' = 'test',
'table-name' = 'test_user'
);

 

源表数据如下 

d53a95f8b01a4676981c365726074779.png

 

然后 flink-sql 这边查询 结果如下 

1fbd5138ad994a5383fbcb31bd47d9f4.png

 

然后 我们来看一下 这里的整个处理流程 

 

 

flink-sql 中 select * from test_user 获取全量数据的调试

首先这里交互的角色抽象的可以理解为两个, 一个是 flink 集群, 一个是 flink sql-client 

然后 flink sql-client 这边组合查询, 相关业务, 然后创建一个 flink 任务, 抛给 flink 集群 

然后 两者进行交互, 首先是拿到 test_user 的快照全量数据, 然后 flink sql-client 这边做业务展示 

然后 test_user 的之后的增删改查, 的处理是基于 mysql binlog 这边来做增量处理 

我们这里 仅仅演示 test_user 的快照数据获取 以及 在 test_user 中增加一条记录, 然后 flink sql-client 这边是 怎么获取到的这个整个流程 

 

如下这里是 flink sql-client 这边将用户输入的 " select * from test_user " 转换为 flink sql 上下文的 operations, 然后封装成 pipeline, 提交给 flink 集群 

11a2df70ea0a4b589f0a9e7c2cfaef2a.png

 

 

看一下 flink 集群这边任务的执行, 首先是 第一次的全量数据的快照 

从 CollectSinkFunction 这边从 buffer 中获取到两条记录, 大致可以看出是 第一条记录 和 第二条记录, 然后 sendBackResults 通过 tcp交互, 将这两条数据对应的 StreamRecord 传输回 flink sql-client 

0609d767ecca4bce9d4605071734d1f9.png

 

往前回溯, 看一下 真正执行查询的地方, 执行的是 "select * from test_user;" 

然后这里迭代会将 查询的记录封装成为 SourceRecord, 然后添加到 recorderMarker 的 bufferedRecordQueue 中 

92e1ba3608324a0b847d9289c3c34dc1.png

 

然后这个 bufferedRecordQueue 是一个队列, 会将消耗的元素调用 enqueueRecord 将数据放入到 records 中 

6b48eaeadc2549758874c0f7b5bfcf25.png

 

这里是更细节的 enqueueRecord 的执行流程, 比如这里 迭代的事 第一条记录 

1c3973e8afe84d86af26ef340e11bb32.png

 

然后接着是 更上一层 Engine 的业务流程, 他会将 SnapshotReader 这边读取的记录更新到 batch 中 

351bda6f26bf4455b4d2f1adfb10a528.png

 

然后就是 Engine 这边的任务的执行, 将数据经过 map, filter, NotNull, 等等相关处理 

最终到达 CollectSinkFunction 这边 

1cf7e36ff8344e1f9391efc607f06186.png

 

然后 CollectSinkFunction 这边将数据封装成 GenericRowData, 然后序列化, 放入 buffer 队列 

然后 最终就是 CollectSinkFunction 上面的流程, 将序列化之后的数据通过 CollectCoordinationResponse 回传给 flink sql-client 

b0fa1585ee8c4f99b1aff634cf651099.png

 

 

然后 flink sql-client 这边的处理如下  

将拿到的数据, 添加到 buffer  队列 

3a6a2ea3c8344bd1be5fa3723ecb62f7.png

dc17460a9c7e409b8701bcf524ac4b56.png

 

然后就是 flink sql-client 这边的主线程的处理了, 从 buffer 中迭代 记录出来, 然后 放到 materializedTable, 然后 之后 cli 这边获取表格数据的时候, 将其传输到 snapshot 中 

494c6dd347c84e6a9548d88ba2d01e01.png

c677e808922f43cb83bc423ff641252b.png

 

 

flink sql-client 这边的展示流程如下 

1b2cf8d16f3b4cf097c9df6b07ef4f66.png

 

然后 做具体的展示, 展示结果如下, 然后 随着之后的迭代 能够获取完整当前页的数据, 展示在 cli 中 

f90e68011bb843cb9196cf2ca7002bfa.png

eae2ea9b78e54ea9a484632a45c7ab34.png

 

 

 

 

flink-sql 中 select * from test_user 获取增量数据的调试

增量数据的获取, 来自于 BinlogClient 这边的获取, 连接 mysql 的服务 

发送获取 binlog 的命令, 然后 之后 mysql 这边有 binlog 的事件之后, 会将相关 事件传递到 BinlogClient 这边 

比如这里 执行了一个 ”insert into test_user (`name`, age) select max(age)+1, max(age+1) from test_user;”, 增加了一条记录 (3, 3) 

然后这边 反序列化之后, 读取到 WriteRowsEvent 数据为 (3, 3) 

5ef9dc67350d489b91903f2abb0547c0.png

 

然后就是 BinlogClient 的后续流程, 将数据使用 recordMarker 记录 

和上面 SnapshotReader 这边处理一样, recorderMarker 会将 SourceRecord记录 添加到 records 列表, 由外层 Engine 层轮询 records 将其进行任务的执行, 到后面的 CollectSinkFunction 传输给 flink sql-client 这边做数据增删改查, 以及展示 

e596147a59714192b4bafcfa5f73d28a.png

 

为记录 (3, 3) 生成 SourceRecord 并放到 records 队列 

34bbed10370743b695fd3af2c99174be.png

 

Engine 层的处理, 其他的这里就不细化了 

f08ed652eda44ad39fc545c7290248f3.png

 

 

 

 

flink mysql-cdc MysqlConnectorTask 的处理 

我们可以看到上面 全量读取使用了 快照读, 然后增量的部分使用基于 binlog 来进行处理 

那么这个 处理流程是在这里呢? 

9da3b393683b4988be75f76ab669e9ef.png

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1569884.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++算法 —— 前缀和

一、【模版】前缀和 1.链接 【模板】前缀和_牛客题霸_牛客网 (nowcoder.com) 2.描述 3.思路 前缀和的思想其实就是一种简单的动态规划&#xff0c;以i位置记录从头位置到i位置的和&#xff0c;然后间接的求一段连续区间的数组和&#xff0c;时间复杂度是O&#xff08;n&…

20.2k stars项目搭建私人网盘界面美功能全

Nextcloud是一套用于创建网络硬盘的客户端&#xff0d;服务器软件。其功能与Dropbox相近&#xff0c;但Nextcloud是自由及开放源代码软件&#xff0c;每个人都可以在私人服务器上安装并执行它。 GitHub数据 20.2k stars561 watching3.2k forks 开源地址:https://github.com/ne…

保研线性代数复习3

一.基底&#xff08;Basis&#xff09; 1.什么是生成集&#xff08;Generating Set&#xff09;&#xff1f;什么是张成空间&#xff08;Span&#xff09;&#xff1f; 存在向量空间V(V&#xff0c;&#xff0c;*)&#xff0c;和向量集&#xff08;xi是所说的列向量&#xff…

H5面临的网络安全威胁和防范措施

H5&#xff0c;是基于HTML5技术的网页文件。HTML&#xff0c;全称Hyper Text Markup Language&#xff0c;即超文本标记语言&#xff0c;由Web的发明者Tim Berners-Lee与同事Daniel W. Connolly共同创立。作为SGML的一种应用&#xff0c;HTML编写的超文本文档能够独立于各种操作…

《机器学习算法面试宝典》正式发布!

大家好&#xff0c;历时半年的梳理和修改&#xff0c;《机器学习算法面试宝典》&#xff08;以下简称《算法面试宝典》&#xff09;终于可以跟大家见面了。 近年来&#xff0c;很多理科专业学生也纷纷转入算法赛道&#xff0c;特别是最近 ChatGPT 的爆火&#xff0c;推动了AI …

服务器硬件构成与性能要点:CPU、内存、硬盘、RAID、网络接口卡等关键组件的基础知识总结

文章目录 服务器硬件基础知识1. CPU&#xff08;中央处理器&#xff09;2. 内存&#xff08;RAM&#xff09;3. 硬盘4. RAID&#xff08;磁盘阵列&#xff09;5. 网络接口卡&#xff08;NIC&#xff09;6. 电源7. 散热器8. 主板9. 显卡10. 光驱 服务器硬件基础知识 服务器是一…

JDK类加载器剖析

0.前言 我之所以深入研究 Java 类加载器&#xff0c;是为了解决一个奇怪的问题。流行出版物&#xff0c;也就是人们所认为的 Java 世界的灯塔&#xff0c;充斥着关于这个主题的相互矛盾和过时的信息。这种矛盾引发了我的调查 — — 在 Java 类加载器的迷宫中寻求清晰的答案。 …

Oracle 数据库中的全文搜索

Oracle 数据库中的全文搜索 0. 引言1. 整体流程2. 创建索引2-1. 创建一个简单的表2-2. 创建文本索引2-3. 查看创建的基础表 3. 运行查询3-1. 运行文本查询3-2. CONTAINS 运算符3-3. 混合查询3-4. OR 查询3-5. 通配符3-6. 短语搜索3-7. 模糊搜索&#xff08;Fuzzy searches&…

【Servlet】Servlet入门

文章目录 一、介绍二、入门案例导入servlet-api的解决办法 一、介绍 概念&#xff1a;server applet&#xff0c;即&#xff1a;运行在服务器端的小程序 Servlet就是一个接口&#xff0c;定义了Java类被浏览器访问到&#xff08;tomcat识别&#xff09;的规则。 将来我们定义…

java数据结构与算法刷题-----LeetCode504. 七进制数

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 倒退迭代&#xff08;除基取余法&#xff09;2. 省略掉反转操…

【JavaEE初阶系列】——文件操作 IO 之 文件系统操作

目录 &#x1f4dd;认识文件 &#x1f6a9;树型结构组织 和 目录 &#x1f388;绝对路径和相对路径 &#x1f6a9;文件类型 &#x1f4dd;文件系统操作 &#x1f388;File 概述 &#x1f388;File类的使用 1. 绝对路径 vs 相对路径 2. 路径分隔符 3. 静态成员变量 4…

小明记账簿-记账工具

今天不了技术&#xff0c;聊一下工具&#xff0c;最近耗费无数个日日夜夜&#xff0c;做了一个记账小程序&#xff0c;感觉很实用&#xff0c;简单方便&#xff0c;希望帮助那些需要帮助的人。 想轻松管理个人财务吗&#xff1f;试试小明记账簿吧&#xff01;它是一款便捷&…

篮桥杯刷题第n天(dp更新)

最长上升子序列 输入&#xff1a; 10 1 4 5 1 4 1 9 1 9 输出&#xff1a; 4 算法思想&#xff1a;记录每个数为结尾的最长子序列长度&#xff0c;作为dp数组。 eg&#xff1a;&#xff08;这个是以每个位置开头的记录最长的来穷举但核心仍为上述的算法思想&#xff09; …

vue快速入门(一)vue的导入方法

注释很详细&#xff0c;直接上代码 新增内容 下载js代码导入实例数据绑定显示 源码 index.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-widt…

【解读Kubernetes架构】全面指南,带你掌握Kubernetes的设计原理与构成!

了解 Kubernetes 架构&#xff1a;综合指南 前言一、什么是 Kubernetes 架构&#xff1f;1.1、控制平面1.2、工作节点 二、Kubernetes 控制平面组件2.1、kube-api服务器2.2、etcd2.3、kube-scheduler2.4、Kube 控制器管理器2.5、云控制器管理器 &#xff08;CCM&#xff09; 三…

【HarmonyOS】ArkUI - 动画

利用属性动画、显示动画、组件转场动画实现组件动画效果。 一、属性动画 属性动画是通过设置组件的 animation 属性来给组件添加动画&#xff0c;当组件的 width、height、Opacity、backgroundColor、scale、rotate、translate 等属性变更时&#xff0c;可以实现渐变过渡效果。…

有人用GPT来做日内交易,居然赚钱了!但是……

在我们还在烦恼会不会被AI替代时&#xff0c;已经有人在教ChatGPT去炒股票了。 在近年ChatGPT火速出圈后&#xff0c;围绕AI能取代什么职业的讨论持续受到大众关注。 从事客服、编程、法律合规以及内容创作等行业人员最早感受到这股AI带来的寒意。 那ChatGPT能不能替代交易员…

Web APIs简介 Dom

JS的组成 API API 是一些预先定义的函数&#xff0c;目的是提供应用程序与开发人员基于软件或硬件得以访问一组例程的能力&#xff0c;而又无需访问源码&#xff0c;或理解内部工作机制的细节 简单理解&#xff1a;API是给程序员提供的一种工具&#xff0c;以便能更轻松的实现…

[计算机知识] 各种小问题思考

哈希算法以及哈希冲突 哈希算法&#xff1a;将任何长度的输入通过散列函数转换成固定长度的字符串 哈希冲突&#xff1a;不同的输入经过哈希函数处理后得到相同的哈希值 因为哈希函数的输出域是有限的 解决哈希冲突&#xff1a; 1. 开放寻址&#xff1a;产生哈希冲突后&…

刷题之Leetcode704题(超级详细)

704. 二分查找 力扣题目链接(opens new window)https://leetcode.cn/problems/binary-search/ 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&am…