Kubernetes集群的搭建与DevOps实践(下)- 部署实践篇
需求清单100张数据表要迁移还要支持后续动态新增双链路同步MySQL到MySQL、MongoDB到PostgreSQL不能写死配置要能灵活扩展工期不到1个月技术约束源环境塔外和目标环境塔内网络完全隔离塔外只能读源库无法访问目标库塔内只能写目标库无法访问源库两端唯一的桥梁阿里云OSS塔外只能写塔内可以读写塔内不支持MongoDB必须用PostgreSQL替代数据规模单表最大1000万行数据单店铺单表50万行涉及1000个店铺总计100张表那一刻我脑海里浮现的画面是在公司地下室疯狂写MyBatis、语句直到猝死...但最终我不仅提前5天完成迁移还搞出了一套能让后续表秒级上线的全自动化流水线。怎么做到的答案就藏在Navicat的导入/导出功能里——直接构造SQL文件上传OSS塔内执行复杂逻辑全都在塔外处理一眼望去的七大技术难点在开始动手前我先梳理了一下面临的挑战难点1表结构千差万别100张表每张表的字段、类型、主键都不一样。传统MyBatis方式意味着要写100个Mapper、100个实体类。后续新增表还得继续写代码复用度≈0。难点2同步策略多样化100张表需要支持四种同步策略条件各不相同全表同步基础配置表数据量小TRUNCATE后一次性插入全部数据公司级条件同步按company_id维度同步支持条件过滤店铺级增量同步有is_deleted和update_time的表按shop_id时间条件增量同步店铺级全量同步物理删除的表按shop_id维度全量同步单店铺数据每张表的策略和条件都不同需要支持灵活配置。难点3数据内容包含特殊字符某些字段的内容包含分号、单引号等SQL特殊字符如果不处理生成的SQL文件会在执行时语法报错。难点4超大数据量单表1000万数据一次性加载到内存必然OOM。而且生成的SQL文件可能几百MB网络传输和存储都是问题。难点5MongoDB到PostgreSQL的类型鸿沟MongoDB的ObjectId、BSON对象、数组类型PostgreSQL都不支持。需要做复杂的类型映射和转换。难点6网络隔离架构塔外和塔内网络完全隔离传统的ETL工具DataX根本用不了。它们都是读→处理→写的单机模式需要同时访问源库和目标库。解决方案自己搭建一个类似navicat的导入/导出能动态执行SQL的功能。难点7表间依赖关系导致的顺序问题部分表之间存在外键依赖关系如order_items依赖orders如果并发同步order_items先执行插入但orders还未同步 → 外键约束失败需要识别依赖关系先同步父表再同步子表保证数据完整性解决方案塔内扫描SQL文件时优先处理父表再并发处理其他表灵感来源Navicat是怎么做的某天深夜我打开Navicat准备手动导出第一批测试数据。盯着导出向导发呆的时候突然脑子里闪过一个念头Navicat是怎么做到导出任意表的我点开导出的.sql文件-- 删除旧表DROP TABLE IF EXISTS demo_table;-- 重建表结构CREATE TABLE demo_table (id int(11) NOT NULL AUTO_INCREMENT,name varchar(50) DEFAULT NULL,PRIMARY KEY (id)) ENGINEInnoDB;-- 插入数据INSERT INTO demo_table VALUES (1, test);豁然开朗Navicat的核心逻辑就是用SHOW CREATE TABLE获取表结构用SELECT *查询数据生成标准SQL文件用户手动在目标库执行如果我把这套逻辑自动化呢塔外自动查表结构、自动查数据、自动生成SQL、自动上传OSS塔内自动扫描OSS、自动读取SQL文件、自动执行这不就完美契合了塔外-塔内的架构约束吗核心方案设计整体架构流程f91cc3bf56d0539d6a6a3560ded4f617技术选型说明塔外系统技术栈组件 选型 使用场景 选型理由消息队列 RocketMQ 触发同步异步解耦进行SQL文件构造 支持TAG过滤(MySQLToMySQL/MongodbToPgSQL顺序消费保证数据一致性支持可后续扩展同步类型例如RedisToMySQL流式处理 JDBC StreamMongoTemplate 读取超大表数据 避免OOMsetFetchSize(Integer.MIN_VALUE)启用MySQL服务器端游标Mongo使用流式读取的api内存占用恒定配置管理 MySQL配置表 管理同步规则 配置驱动新增表无需改代码支持占位符动态替换{shopId}/{companyId}文件上传 阿里云OSS SDK SQL文件上传 唯一能打通塔外塔内的桥梁可用性99.995%支持大文件塔内系统技术栈组件 选型 使用场景 选型理由并发控制 CompletableFuture 并发处理多个SQL文件 JDK8原生无需引入第三方库轻量级异步编程文件下载 阿里云OSS SDK SQL文件下载和删除 流式下载支持逐行读取执行成功后立即删除防止重复批量执行 JDBC Batch SQL批量执行 1000条/批平衡性能和内存setAutoCommit(true)防止事务过大第一难100张表结构各异怎么动态生成SQL传统方案的绝望之路如果用传统MyBatis写法画面会是这样SELECT id, name, create_time FROM table_1 WHERE shop_id #{shopId}SELECT id, title, status FROM table_2 WHERE company_id #{companyId}手写100个Mapper别说一个月一年都写不完而且后续新增表还得继续写代码复用度约等于0。灵感来源SHOW CREATE TABLEMySQL提供了一个神器SHOW CREATE TABLESHOW CREATE TABLE user_info;输出CREATE TABLE user_info (id int(11) NOT NULL AUTO_INCREMENT,username varchar(50) DEFAULT NULL,create_time datetime DEFAULT NULL,PRIMARY KEY (id)) ENGINEInnoDB;拿到建表语句 拿到了一切表信息字段名、类型、主键...核心实现动态解析表结构public TableStructure getTableStructure(DataSource ds, String tableName) {String sql SHOW CREATE TABLE tableName ;try (Connection conn ds.getConnection();Statement stmt conn.createStatement();ResultSet rs stmt.executeQuery(sql)) {if (rs.next()) {String ddl rs.getString(2); // 第2列是DDL语句// 核心正则解析DDL语句List columns parseColumns(ddl); // 提取字段名String primaryKey parsePrimaryKey(ddl); // 提取主键return new TableStructure(columns, primaryKey);}}return null;}关键亮点表名转义防止关键字冲突如表名叫order、user正则解析DDL一次性获取字段、主键、类型信息零硬编码任何表都能自动处理后续新增表只需加配置你问怎么知道哪张表要同步表名从哪来请继续往下看...第三难中有解决方案通过配置表实现这里用到JDBC编程适合当前业务需求古法编程不得已而为之生成完整SQL文件拿到表结构后生成标准SQL文件// 1. 先删除目标环境的旧数据保证幂等性String deleteStatement DELETE FROM user_info WHERE shop_id 12345;\n;// 2. 批量插入新数据(每批1000条)String insertStatement INSERT INTO user_info (id, username, create_time) VALUES\n (1, Alice, 2025-01-01 12:00:00),\n (2, Bob, 2025-01-02 13:00:00);\n;上传到OSS后塔内直接逐行读取执行完美第二难数据里有分号SQL会被切割炸掉问题现场默认SQL语句以;结尾但数据内容可能包含各种特殊情况-- 情况1: 数据中包含分号INSERT INTO content VALUES (1, 教程:Java;Spring;MyBatis);-- 情况2: 数据以分号结尾INSERT INTO config VALUES (2, path/usr/local/bin;);-- 情况3: 数据中有换行符,且以;结尾INSERT INTO article VALUES (3, 第一行第二行;第三行);塔内如果用;判断SQL结束String line reader.readLine();// 只读到: INSERT INTO content VALUES (1, 教程:Java// 数据被截断了导致SQL切割错位、语法报错。解决方案特殊符号标记 逐行读取核心思路每条SQL独占一行用特殊符号;#END#标记结束塔外生成SQL时// 关键使用特殊符号作为SQL结束标记String SPECIAL_DELIMITER ;#END#;// 构造SQL数据内容里的分号、换行符都不处理String sql INSERT INTO content VALUES (1, Java;Spring);// 写入文件每条SQL独占一行以特殊符号结尾writer.write(sql SPECIAL_DELIMITER);writer.write(System.lineSeparator()); // 系统换行符上传到OSS的文件内容INSERT INTO content VALUES (1, Java;Spring);#END#INSERT INTO config VALUES (2, path/usr/bin;);#END#INSERT INTO article VALUES (3, 第一行\n第二行);#END#说明每条SQL独占一行以System.lineSeparator()换行每条SQL以;#END#结尾完整的SQL结束标记数据内容里的分号;、换行符\n等都保持原样塔内执行前还原try (BufferedReader reader new BufferedReader(new InputStreamReader(ossStream))) {List sqlBatch new ArrayList();StringBuilder currentSql new StringBuilder();String line;while ((line reader.readLine()) ! null) {// 拼接当前行currentSql.append(line);// 检查是否是完整的SQL以;#END#结尾if (currentSql.toString().endsWith(;#END#)) {// 还原特殊符号 → 正常分号String realSql currentSql.toString().replace(;#END#, ;);// 添加到批次sqlBatch.add(realSql);currentSql.setLength(0); // 清空准备下一条SQL// 批量执行每500条一批if (sqlBatch.size() 100) {executeBatch(stmt, sqlBatch);sqlBatch.clear();}}}// 执行剩余SQLif (!sqlBatch.isEmpty()) {executeBatch(stmt, sqlBatch);}}为什么选;#END#足够长不会和数据内容冲突实测几千万条数据从未冲突标记明确易于理解塔内处理简单一行代码搞定关键点为什么塔内要逐行读取原因一SQL文件可能很大单个SQL文件可能达到几百MB如50万行数据如果一次性读取内存占用过高100MB文件加载需要几百MB内存而且多线程处理更容易造成OOMGC压力大大对象频繁创建和回收原因二无法按普通分号切割如果用;切割会出错// ? 错误做法String[] sqls allContent.split(;); // 会误切数据里的分号正确做法逐行拼接遇到;#END#才算完整// ? 正确做法StringBuilder currentSql new StringBuilder();while ((line reader.readLine()) ! null) {currentSql.append(line);if (currentSql.toString().endsWith(;#END#)) {String sql currentSql.toString().replace(;#END#, ;);executeBatch(sql);currentSql.setLength(0); // 清空准备下一条}}SQL文件格式示例DELETE FROM table WHERE id 1;#END#INSERT INTO table VALUES (1, data;with;semicolons);#END#INSERT INTO table VALUES (2, line1\nline2);#END#第三难同步策略多样化怎么灵活配置背景四种同步策略同步策略 适用场景 SQL操作 数据范围全表同步 基础配置表数据量小千行级 TRUNCATE INSERT 整张表的所有数据公司级条件同步 按公司维度管理的表 DELETE WHERE company_id? INSERT 单个公司的所有数据店铺级增量同步 有软删除标记和更新时间的表 DELETE WHERE shop_id? AND ... INSERT 单店铺增量数据店铺级全量同步 物理删除的表 DELETE WHERE shop_id? INSERT 单店铺全部数据问题100张表里四种策略混杂查询条件各不相同。需要灵活配置每张表的同步策略和WHERE条件。解决方案配置驱动 占位符核心思想把同步策略、查询条件放到配置表里每张表单独配置配置表设计CREATE TABLE sync_config (id int PRIMARY KEY,table_name varchar(100),table_level varchar(20), -- company/shopsync_type int, -- 0:全表, 1:条件同步where_condition text, -- WHERE条件模板支持占位符delete_strategy varchar(20) -- TRUNCATE/DELETE);配置示例-- 全表同步INSERT INTO sync_config VALUES (1, sys_config, company, 0, NULL, TRUNCATE);-- 公司级条件同步INSERT INTO sync_config VALUES (2, company_settings, company, 1,company_id {companyId} AND status 1, DELETE);-- 店铺级增量同步INSERT INTO sync_config VALUES (3, user_table, shop, 1,shop_id {shopId} AND update_time {lastTime}, DELETE);-- 店铺级全量同步INSERT INTO sync_config VALUES (4, order_table, shop, 1,shop_id {shopId}, DELETE);占位符替换逻辑private String buildWhereCondition(String template, SyncContext ctx) {if (template null) return ; // 全表同步无WHERE条件return template.replace({shopId}, String.valueOf(ctx.getShopId())).replace({companyId}, String.valueOf(ctx.getCompanyId())).replace({lastTime}, ctx.getLastSyncTime());}SQL生成过程以店铺级增量同步为例步骤1构造查询SQL// 占位符替换后得到WHERE条件String whereCondition shop_id 123 AND update_time 2025-01-15 00:00:00;// 构造SELECT语句String selectSql SELECT * FROM user_table WHERE whereCondition;步骤2流式读取并生成SQL文件关键点从ResultSet元数据动态获取字段而非写死字段名try (ResultSet rs stmt.executeQuery(selectSql)) {ResultSetMetaData metadata rs.getMetaData();int columnCount metadata.getColumnCount();// 从元数据获取列名列表List columnNames new ArrayList();for (int i 1; i columnCount; i) {columnNames.add(metadata.getColumnName(i));}// 1. 先写DELETE语句writer.write(DELETE FROM user_table WHERE whereCondition ;#END#);writer.write(System.lineSeparator());// 2. 构造INSERT语句头部字段名从元数据获取String insertHeader INSERT INTO user_table ( String.join(, , columnNames) ) VALUES\n;StringBuilder values new StringBuilder();int batchCount 0;// 3. 流式读取数据并拼接VALUESwhile (rs.next()) {values.append(();for (int i 1; i columnCount; i) {if (i 1) values.append(, );// 根据字段类型格式化值动态处理values.append(formatValue(rs, i, metadata.getColumnType(i)));}values.append());batchCount;// 每10行生成一条INSERTif (batchCount 10) {writer.write(insertHeader values.toString() ;#END#);writer.write(System.lineSeparator());values.setLength(0);batchCount 0;} else {values.append(, );}}// 4. 处理剩余数据if (batchCount 0) {writer.write(insertHeader values.toString() ;#END#);}}最终生成的SQL文件DELETE FROM user_table WHERE shop_id 123 AND update_time 2025-01-15 00:00:00;#END#INSERT INTO user_table (id, shop_id, username, update_time) VALUES(1, 123, Alice, 2025-01-16 10:00:00),(2, 123, Bob, 2025-01-16 11:00:00);#END#优势总结? 灵活性四种策略自由配置满足不同表的需求? 可扩展新增表只需加配置代码零改动? 占位符支持{shopId}、{companyId}、{lastTime}等动态参数? 零硬编码字段名从元数据动态获取适配任意表结构第四难单表50W数据如何防止OOM问题传统方式的内存杀手// 反面教材一次性加载全部数据String sql SELECT * FROM huge_table WHERE shop_id 123;List allRows jdbcTemplate.queryForList(sql); // 直接OOM单店铺单表可能50W行全部加载到内存会导致OutOfMemoryError。解决方案流式读取 临时文件MySQL流式读取private void generateSQL(DataSource ds, String sql) throws SQLException {try (Connection conn ds.getConnection();Statement stmt conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, // 只向前遍历ResultSet.CONCUR_READ_ONLY)) { // 只读模式// 核心启用MySQL流式读取stmt.setFetchSize(Integer.MIN_VALUE); // MySQL JDBC特殊约定try (ResultSet rs stmt.executeQuery(sql)) {int batchCount 0;StringBuilder sqlValues new StringBuilder();while (rs.next()) { // 逐行处理sqlValues.append(();for (int i 1; i columnCount; i) {sqlValues.append(formatValue(rs, i));}sqlValues.append());batchCount;// 每10行生成一条INSERTif (batchCount 10) {writeInsert(sqlValues.toString());sqlValues.setLength(0); // 清空缓冲batchCount 0;}}}}}核心技巧stmt.setFetchSize(Integer.MIN_VALUE)MySQL JDBC的特殊约定启用服务器端游标每次只拉取1行数据到客户端内存占用恒定批量拼接VALUES多行生成一条INSERT减少SQL数量MongoDB流式读取CloseableIterator iterator mongoTemplate.stream(query, Document.class, collectionName);try {while (iterator.hasNext()) {Document doc iterator.next(); // 逐文档处理processDocument(doc);}} finally {iterator.close(); // ?? 必须手动关闭否则连接泄漏}塔内执行流式读取try (BufferedReader reader new BufferedReader(new InputStreamReader(ossStream))) {List sqlBatch new ArrayList();StringBuilder currentSql new StringBuilder();String line;while ((line reader.readLine()) ! null) {// 拼接当前行currentSql.append(line);// 检查是否是完整的SQL以;#END#结尾if (currentSql.toString().endsWith(;#END#)) {// 还原特殊符号 → 正常分号String realSql currentSql.toString().replace(;#END#, ;);// 添加到批次sqlBatch.add(realSql);currentSql.setLength(0); // 清空准备下一条SQL// 批量执行每100条一批塔外10条数据构造成1个insert语句if (sqlBatch.size() 100) {executeBatch(stmt, sqlBatch);sqlBatch.clear();}}}// 执行剩余SQLif (!sqlBatch.isEmpty()) {executeBatch(stmt, sqlBatch);}// 关键自动提交避免事务过大conn.setAutoCommit(true);}为什么setAutoCommit(true)单文件可能几千条SQL如果在一个事务里会导致锁表时间过长回滚日志暴涨内存占用飙升自动提交后每条SQL独立提交避免以上问题。效果对比方案 内存占用 风险一次性加载 2GB(50W行) 必然OOM流式处理 50MB(常量级) 稳定第五难MongoDB到PostgreSQL的类型转换问题MongoDB和PostgreSQL的数据类型完全不兼容MongoDB PostgreSQL 问题ObjectId 无对应类型 主键转换BSON对象 JSONB 嵌套结构数组 Array 类型声明解决方案在配置表的扩展字段定义类型映射{mongoCollection: user_profile,pgTable: user_profile,fieldMapping: {_id: id,preferences: preferences,tags: tags},typeMapping: {_id: OBJECTID_TO_VARCHAR,preferences: JSONB,tags: INTEGER_ARRAY}}类型转换代码private String convertValue(Object value, String typeRule) {if (value null) return NULL;switch (typeRule) {case JSONB:// {name: test} → {name:test}::jsonbString json toJsonString(value);return escapeSql(json) ::jsonb;case INTEGER_ARRAY:// [1,2,3] → ARRAY[1,2,3]::INTEGER[]List list (List) value;return ARRAY[ String.join(,, list) ]::INTEGER[];case OBJECTID_TO_VARCHAR:// ObjectId(507f...) → 507f...return value.toString() ;default:return convertDefault(value);}}复盘一个月完成迁移的关键整体架构塔外-塔内双链路┌──────────── 塔外系统 (Outer) ────────────┐│ ││ ① API触发同步 ││ ② 查询配置表 → 拆分公司级/店铺级配置 ││ ③ 构建MQ消息 → 投递RocketMQ ││ ④ MQ Consumer ││ ├─ SHOW CREATE TABLE 获取表结构 ││ ├─ 流式读取源数据库 ││ ├─ 生成 DELETE INSERT SQL ││ ├─ 分号替换为特殊符号 ││ └─ 上传到 OSS │└───────────────────────────────────────────┘││ OSS中转↓┌──────────── 塔内系统 (Inner) ────────────┐│ ││ ⑤ 定时任务 / 手动触发 ││ ⑥ 扫描OSS目录 → 获取待处理SQL文件列表 ││ ⑦ 流式下载SQL文件 → 逐行读取 ││ ├─ 特殊符号还原为分号 ││ ├─ 批量执行(1000条/批) ││ └─ setAutoCommit(true) 防止事务过大 ││ ⑧ 执行成功 → 立即删除OSS文件 │└───────────────────────────────────────────┘核心亮点总结技术点 传统方案 本方案 效果表结构获取 手写100个Mapper SHOW CREATE TABLE动态解析 零硬编码支持任意表SQL分隔符 用;判断结束 特殊符号;#END# 支持数据含分号、换行符同步策略 全量同步or硬编码 配置表占位符 灵活配置4种策略大数据量处理 一次性加载(OOM) 流式读取临时文件 常量级内存50W行稳定扩展性 新增表需改代码 只需加配置 秒级上线新表同步做对的3件事1. 从工具中偷师学艺Navicat的导入/导出功能启发了整体方案SHOW CREATE TABLE是突破口2. 把复杂逻辑放在塔外塔内只负责执行SQL逻辑简单塔外可以随意调试、优化3. 配置驱动而非代码驱动新增表只需加配置不改代码。后续维护成本趋近于0最终效果指标 数据迁移表数量 200张含后续新增最大单表数据 1000万行首次全量同步 10-30分钟日常增量同步 公司级表约30秒店铺级表约1分钟内存占用 稳定在200MB左右OOM次数 0连续运行3个月工期 25天提前5天完成写在最后以上便是我这次迁移实战的全部分享。绝非标准答案但希望能为你带来一丝灵感。这次迁移让我深刻体会到好的架构不是设计出来的而是从实际问题中偷出来的。当你面对技术难题时不妨问自己有没有现成的工具已经解决了类似问题不要重复造轮子Navicat数据库/框架本身提供了什么能力SHOW CREATE TABLE、setFetchSize能否用配置代替硬编码配置表占位符感谢那些默默扛下所有的技术细节SHOW CREATE TABLE —— 你扛下了表结构解析的苦活stmt.setFetchSize(Integer.MIN_VALUE) —— 你默默守护了内存安全;#END# —— 你可能是全网最诡异但最实用的分隔符RocketMQ的TAG过滤 —— 你让消息路由变得优雅CompletableFuture —— 你让塔内并发处理成为可能System.lineSeparator() —— 你让SQL文件格式清晰明了最后送大家一段话写代码的时候我们都是站在巨人肩膀上的追梦人。技术本身没有高低贵贱能解决问题的就是好技术。不要盲目追求所谓的最佳实践在约束下求最优解才是工程师的智慧。按掖咀墒
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2463427.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!