FlinkCDC实战:从单表到多源合并,一键搞定MySQL实时同步(Flink 1.16.2)
1. 环境准备与基础配置在开始FlinkCDC实战之前我们需要先搭建好基础环境。我建议使用Linux系统进行操作这里以CentOS 7为例。首先确保你已经安装了JDK 1.8这是Flink运行的基本要求。下载Flink 1.16.2安装包时要注意选择与Scala版本匹配的包。我遇到过不少新手因为选错版本导致各种奇怪的报错所以这里特别提醒我们使用的是scala_2.12版本。解压后你会得到一个flink-1.16.2目录这就是我们的工作目录。关键的依赖包有两个flink-sql-connector-mysql-cdc-2.3.0.jar用于捕获MySQL变更数据flink-connector-jdbc-3.0.0-1.16.jar用于写入目标MySQL这两个jar包需要放在flink的lib目录下。我习惯把常用依赖都放在这里避免每次启动时都要指定classpath。在实际项目中你可能还需要考虑把这些依赖打包到自定义镜像中方便部署。MySQL版本兼容性是个需要注意的点。经过实测FlinkCDC 2.3.0可以很好地支持MySQL 5.7和8.0版本。如果你的生产环境还在用更老的MySQL版本建议先升级否则可能会遇到一些奇怪的兼容性问题。2. 单表同步实战2.1 基础同步配置我们先从最简单的单表同步开始。假设要把源库alarm中的alarminfo表同步到目标库首先需要启动Flink集群./bin/start-cluster.sh然后启动SQL客户端./bin/sql-client.sh在SQL客户端中我们需要定义两个表源表和目标表。源表使用mysql-cdc连接器这是FlinkCDC的核心组件。这里有个小技巧定义表时最好指定主键这样Flink能更高效地处理数据变更。CREATE TABLE source_table ( id STRING NOT NULL, AlarmTypeID STRING, Time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname source-mysql, port 3306, username flinkuser, password flinkpass, database-name alarm, table-name alarminfo, server-time-zone Asia/Shanghai );目标表使用jdbc连接器这里有几个重要参数需要注意url中的useSSLtrue在生产环境是必须的serverTimezoneAsia/Shanghai要和源表一致scan.fetch-size影响查询性能默认值通常偏小CREATE TABLE target_table ( id STRING NOT NULL, AlarmTypeID STRING, Time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector jdbc, url jdbc:mysql://target-mysql:3306/alarm?useSSLtrueserverTimezoneAsia/Shanghai, username flinkuser, password flinkpass, table-name alarminfo );2.2 启动同步任务定义好表结构后启动同步只需要一条简单的INSERT语句INSERT INTO target_table SELECT * FROM source_table;这条语句会先做全量同步然后自动转为增量同步。我第一次用时觉得很神奇FlinkCDC内部自动处理了全量和增量的切换完全不需要人工干预。在Flink UI上可以看到这个作业的运行情况。建议新手多观察UI上的指标特别是numRecordsIn和numRecordsOut这两个指标能直观反映数据同步的吞吐量。如果发现numRecordsOut明显小于numRecordsIn可能出现了数据丢失需要检查配置。3. 多源表合并同步3.1 多源表配置实际项目中经常需要把多个源表合并到一个目标表。比如有两个MySQL实例都存有告警数据我们希望合并到一个分析库中。这时可以用UNION ALL来实现同时用额外字段标记数据来源。首先定义两个源表CREATE TABLE source_table_50 ( id STRING NOT NULL, AlarmTypeID STRING, Time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname 192.168.1.50, port 3306, username flinkuser, password flinkpass, database-name alarm, table-name alarminfo ); CREATE TABLE source_table_51 ( id STRING NOT NULL, AlarmTypeID STRING, Time TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector mysql-cdc, hostname 192.168.1.51, port 3306, username flinkuser, password flinkpass, database-name alarm, table-name alarminfo );目标表增加source_line字段标识来源CREATE TABLE target_table ( id STRING NOT NULL, AlarmTypeID STRING, Time TIMESTAMP(3), source_line INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector jdbc, url jdbc:mysql://target-mysql:3306/alarm?useSSLtrue, username flinkuser, password flinkpass, table-name alarminfo );3.2 使用STATEMENT SET管理事务多表同步建议使用STATEMENT SET来保证原子性BEGIN STATEMENT SET; INSERT INTO target_table SELECT *, 50 AS source_line FROM source_table_50; INSERT INTO target_table SELECT *, 51 AS source_line FROM source_table_51; END;这种方式比直接用UNION ALL更灵活可以单独控制每个插入语句。我在实际项目中发现当源表结构不完全相同时STATEMENT SET的方式更容易维护。4. 自动化部署与优化4.1 使用初始化脚本每次都手动输入SQL很麻烦我们可以把配置写成脚本文件。创建init.sql设置作业参数SET execution.runtime-modestreaming; SET pipeline.namemysql_cdc_job; SET parallelism.default4;再创建flinkSqlInit.sql包含所有表定义和同步逻辑-- 源表定义 CREATE TABLE IF NOT EXISTS source_table_50 (...); -- 目标表定义 CREATE TABLE IF NOT EXISTS target_table (...); -- 启动同步 INSERT INTO target_table SELECT *, 50 AS source_line FROM source_table_50;然后一键启动./bin/sql-client.sh -i init.sql -f flinkSqlInit.sql4.2 性能调优建议经过多次实战我总结出几个性能优化点适当增大scan.fetch-size默认值偏小scan.fetch-size 2000调整checkpoint间隔根据数据重要性SET execution.checkpointing.interval 30s;并行度设置要合理通常和CPU核数相关SET parallelism.default 8;对于大表可以配置chunk参数scan.incremental.snapshot.chunk.size8096网络不稳定时增加重试次数connect.max-retries 105. 常见问题排查5.1 连接问题最常见的错误是连接失败。检查以下几点用户名密码是否正确网络是否互通MySQL是否配置了白名单SSL配置是否正确5.2 数据不一致如果发现目标表数据不全检查binlog是否开启确认用户有REPLICATION权限查看Flink日志是否有异常5.3 性能问题同步延迟大时可以考虑增加并行度调整chunk大小升级硬件配置火焰图是个很好的分析工具在flink-conf.yaml中添加rest.flamegraph.enabled: true但要注意生产环境慎用因为会带来额外开销。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2541711.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!