Seatunnel实战:构建Mysql到Hive的稳定数据同步管道
1. 为什么选择Seatunnel做数据同步第一次接触Seatunnel是在去年一个数据仓库迁移项目里。当时客户要求把几十个MySQL业务库的数据实时同步到Hive做分析试了好几个工具都不太理想。要么配置复杂得要命要么性能跟不上直到发现了这个宝藏工具。Seatunnel最让我惊喜的是它的简单暴力。你可能想象不到只需要一个不到100行的配置文件就能搞定MySQL到Hive的全量增量同步。我实测过单任务每天稳定同步20亿数据延迟控制在5分钟以内这在以前用Sqoop的时候简直不敢想。它的核心优势其实就三点第一是配置极其简单完全不需要写代码第二是性能炸裂底层基于Spark引擎第三是稳定性超强自带断点续传和Exactly-Once语义保障。这三点正好戳中了企业级数据同步的所有痛点。2. 环境准备与安装部署2.1 基础环境检查在开始之前建议先检查下你的环境是否符合这些要求已经部署好的Hadoop集群CDH或Apache原生版本都行至少Spark 2.4环境MySQL服务可正常连接Hive Metastore服务正常我遇到过最常见的问题就是Spark和Hive的版本兼容性。比如有次用Spark 3.1连CDH5的Hive就各种报错后来换成Spark 2.4才解决。所以如果你要用生产环境强烈建议先用测试环境验证下版本组合。2.2 Seatunnel安装安装过程简单到令人发指# 下载解压 wget https://archive.apache.org/dist/incubator/seatunnel/2.1.1/apache-seatunnel-incubating-2.1.1-bin.tar.gz tar -zxvf apache-seatunnel-incubating-2.1.1-bin.tar.gz -C /opt/ cd /opt/apache-seatunnel-incubating-2.1.1 # 配置环境变量 echo export SEATUNNEL_HOME/opt/apache-seatunnel-incubating-2.1.1 ~/.bashrc source ~/.bashrc注意一个小坑Seatunnel默认不带MySQL JDBC驱动需要手动放到lib目录下cp mysql-connector-java-8.0.23.jar $SEATUNNEL_HOME/lib/3. 全量同步配置实战3.1 基础配置文件解析先看一个最基础的同步配置模板vim $SEATUNNEL_HOME/jobs/mysql_to_hive.conf文件内容如下env { spark.app.name mysql_to_hive_sync spark.executor.memory 4g spark.executor.cores 2 spark.executor.instances 10 } source { jdbc { driver com.mysql.jdbc.Driver url jdbc:mysql://mysql-prod:3306/order_db table orders user reader password safe_password result_table_name source_table } } transform { # 这里可以加数据转换逻辑 } sink { hive { sql insert overwrite table ods.orders select * from source_table } }这个配置做了三件事设置Spark作业的基础参数从MySQL的order_db.orders表读取数据全量覆盖写入到Hive的ods.orders表3.2 分区表特殊处理如果目标Hive表是分区表配置需要稍作调整sink { hive { sql insert overwrite table ods.orders partition(dt${partition_date}) select *, ${partition_date} as dt from source_table } }这里有个实用技巧可以通过${variable}的方式动态传入分区值。我一般会在外层包装一个shell脚本自动生成当天的日期作为分区值。4. 增量同步方案设计4.1 基于时间戳的增量同步生产环境更常见的是增量同步场景。假设表里有create_time字段可以这样配置source { jdbc { # ...其他参数同上 query select * from orders where create_time ${last_update_time} } }这里的关键是要有个地方存储last_update_time。我的做法是用Hive建个元数据表来记录CREATE TABLE IF NOT EXISTS sync_metadata ( db_name STRING, table_name STRING, last_update STRING );4.2 Exactly-Once保障机制要实现不丢不重的精准一次同步需要组合使用以下参数source { jdbc { # 启用增量模式 incremental.column id incremental.start ${start_id} # 每次读取10000条 fetch.size 10000 } } sink { hive { # 使用事务写入 write.mode append # 启用事务表 transactional true } }这个方案的核心是通过incremental.column指定自增ID列每次同步完成后记录最大的ID值下次从这个点继续。5. 性能调优实战经验5.1 关键参数优化经过多次压测这几个参数对性能影响最大参数建议值说明spark.executor.instances10-20根据数据量调整spark.executor.memory4-8g太大反而容易OOMspark.sql.shuffle.partitions200-400控制reduce阶段并行度fetch.size5000-10000MySQL每次fetch行数特别提醒不要盲目增加executor内存我遇到过设为8g反而比4g慢的情况原因是GC时间变长了。5.2 并行读取技巧对于大表可以使用分片并行读取source { jdbc { # 按照id范围分4片读取 partition_column id partition_lower_bound 1 partition_upper_bound 1000000 partition_num 4 } }这个配置会让4个executor并行读取不同id区间的数据实测能让吞吐量提升3-5倍。6. 生产环境运维方案6.1 自动化脚本模板分享一个我在生产环境用的自动化脚本框架#!/bin/bash # 获取前一天日期 SYNC_DATE$(date -d -1 day %Y%m%d) # 生成配置文件 cat $SEATUNNEL_HOME/jobs/order_sync_${SYNC_DATE}.conf EOF env { spark.app.name order_sync_${SYNC_DATE} # ...其他参数 } source { jdbc { query select * from orders where date_format(create_time,%Y%m%d)${SYNC_DATE} } } sink { hive { sql insert overwrite table ods.orders partition(dt${SYNC_DATE}) select * from source_table } } EOF # 提交任务 $SEATUNNEL_HOME/bin/start-seatunnel-spark.sh \ --master yarn \ --deploy-mode cluster \ --config $SEATUNNEL_HOME/jobs/order_sync_${SYNC_DATE}.conf # 错误处理 if [ $? -ne 0 ]; then echo 同步失败 | mail -s 订单表同步报警 opsexample.com fi6.2 监控与告警建议在脚本中加入以下监控点源表和数据量校验任务执行时间监控目标表数据完整性检查可以用简单的方式实现比如在同步完成后执行SELECT COUNT(1) FROM ods.orders WHERE dt${SYNC_DATE}然后和源表count结果对比。7. 常见坑与解决方案7.1 字符集问题遇到乱码时检查MySQL连接URL是否包含url jdbc:mysql://host:3306/db?useUnicodetruecharacterEncodingutf87.2 时区不一致如果发现时间字段差8小时需要添加时区参数url jdbc:mysql://host:3306/db?serverTimezoneAsia/Shanghai7.3 大字段处理同步text/blob类型字段时要调整fetch.sizejdbc { fetch.size 1000 # 比常规值小 }最近在金融项目里同步一个包含超大JSON字段的表时发现设为500性能反而比1000好这个需要根据实际情况测试。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2507137.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!