开源共建 | Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享

news2025/8/16 15:21:16

一、前言

ChunJun(原FlinkX)是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具,既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 binlog,Kafka等。同时 ChunJun 也是一个支持原生 FlinkSql所有语法和特性的计算框架。

ChunJun 具有丰富的插件种类,多达40种,如常见的 mysql、binlog、logminer 等,大部分插件都支持 source/reader、sink/writer 及维表功能。目前很多用户在思考能否在 Dinky 上使用 ChunJun 的插件以提供更全面的能力。那本文将带来如何在 Dinky 上集成 ChunJun 丰富的插件,其实简单,那我们开始吧。

二、部署 Flink+ChunJun

编译

注意,如果需要集成 Dinky,需要将 ChunJun项目下的 chunjun-core 的pom 文件中的 logback-classic 和 logback-core 注释掉,否则容易在 Dinky 执行 sql 任务的时候报错。

file 然后执行:

file

部署

使用 ChunJun 需要先部署 Flink 集群,其部署本文不再做指导。

值得注意的是,如果你需要调用 Flinkx 的 connect jar 的话,则需要将 classloader.resolve-order 改成 parent-first。修改完成配置以后,把 Flinkx 的 jar 包复制过来,主要是 chunjun-clients-master.jar(Flinkx 现在改名 ChunJun )以及 chunjun 的其它 connector 放到 flink/lib 目录下,如图所示。 file

异常处理

如果启动集群时出现异常,即 Flink standalone 集群加载 flinkx-dist 里 jar 包之后,集群无法启动,日志报错:Exception in thread "main" java.lang.NoSuchFieldError: EMPTY_BYTE_ARRAY.

Exception in thread"main"java.lang.NoSuchFieldError:EMPTY_BYTE_ARRAY at org.apache.logging.log4j.core.config.ConfigurationSource. (ConfigurationSource.java:56) at org.apache.logging.log4j.core.config.NullConfiguration.< init>(NullConfiguration.java:32) at org.apache.logging.log4j.core.LoggerContext.< clinit>(LoggerContext.java:85) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.log4j.LogManager.< clinit>(LogManager.java:72) at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285) at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.< clinit>(ClusterEntrypoint.java:107)

原因:这个报错是因为 log4j 版本不统一导致的,因为 flinkx-dist 中部分插件引用的还是旧版本的 log4j 依赖,导致集群启动过程中,出现了类冲突问题;

方案:临时方案是将 flink lib 中 log4j 相关的jar包名字前加上字符 ‘a‘,使得flink standalone jvm 优先加载。 file

三、部署 Dinky

编译

file 编译完成后的压缩包在 Dinky 根目录下的 build 文件夹下。

部署

1、上传dlink压缩包到部署服务器

2、解压

file 3、数据库初始化

4、把 flink 的 jar 放到 dlink 目录下

file

因为目前 flinkx 的稳定版本是 1.12.7,所以我们把 dlink 默认的 client 版本修改为 1.12

file lib下的目录如图:

file 注意:因为我没有用上 dlink-connector-jdbc 的 jar 包,所以图中的 dlink-connector-jdbc-1.13-0.6.4-SNAPSHOT.jar 没有换成1.12版本的,可以去掉。

启动

启动命令 file

注册集群实例

在集群实例中注册已经启动的 Flink 集群。 file

四、示例分享

添加依赖

这里演示 mysql->mysql 的同步作业,所以需要 Flinkx 的 mysql-connector.jar 以及核心 jar。 file

编写作业

Mysql DDL:

CREATE TABLE datasource_classify ( id int unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id', classify_code varchar(64) NOT NULL COMMENT '类型栏唯一编码', sorted int NOT NULL DEFAULT '0' COMMENT '类型栏排序字段 默认从0开始', classify_name varchar(64) NOT NULL COMMENT '类型名称 包含全部和常用栏', is_deleted tinyint NOT NULL DEFAULT '0' COMMENT '是否删除,1删除,0未删除', gmt_create datetime DEFAULT CURRENT_TIMESTAMP, gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY classify_code (classify_code) ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='数据源分类表';

CREATE TABLE source ( id bigint, classify_code STRING, sorted int, classify_name STRING, is_deleted int, gmt_create timestamp(9), gmt_modified timestamp(9), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://192.168.31.101:3306/datasource?useSSL=false', 'table-name' = 'datasource_classify', 'username' = 'root', 'password' = 'root' ,'scan.fetch-size' = '2' ,'scan.query-timeout' = '10' );

CREATE TABLE sink ( id bigint, classify_code STRING, sorted int, classify_name STRING, is_deleted int, gmt_create timestamp(9), gmt_modified timestamp(9), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false', 'table-name' = 'datasource_classify', 'username' = 'root', 'password' = 'root' ,'scan.fetch-size' = '2' ,'scan.query-timeout' = '10' );

insert into sink select * from source u;

执行任务

file 选中 Yarn Session 模式提交作业。 file 提交后可从执行历史查看作业提交状况。 file 进程中可以看的 Flink 集群上批作业执行完成。

对比数据

源库: file 目标库: file 同步成功,很丝滑。

五、总结

在集成 ChunJun 的时候遇到的问题大部分都是缺包以及包冲突,所以只需要注意一下这个问题就能比较好的进行集成。

在集成服务的时候建议是,先把 Flink 和 ChunJun 进行集成,确保服务能够正常启用以后再进行 Dinky 的集成,这样有利于快速定位查找问题,如果遇到文章之外的问题,也可以查看 Dinky 官网FAQ | Dinky (dlink.top) chunjun的官网QuickStart | ChunJun 纯钧 (dtstack.github.io/chunjun/),看看是否有类似问题的解决办法作为参考。

六、用户体验

因为本人目前还是处于学习使用的过程中,所以很多功能没有好好使用,待自己研究更加透彻后希望写一篇文章,优化官网的用户手册。以下的优缺点以及建议都是目前我在使用学习的过程中遇到的问题。

优点

Dinky 最吸引我的地方应该就是 sql 编辑模版了,直接快捷键生成 sql 模版,在开发测试中屡试不爽。在集成了 ChunJun(Flinkx) 以后,能够做到多源数据的离线跑批任务及日常小批量实时任务的同步。支持各种类型的任务执行方式。

缺点

ui 上适配还有点小问题,例如:打开 F12 调整宽度后,再关闭,页面 ui 不会自适应,需要刷新。

期待改进点

1、更多的自定义异常、业务异常

2、增加新的向导模式,结合数据源,通过 webUI 可以一键引入字段或者勾选需要的字段,生成 Flink Sql 的一大部分配置

CREATE TABLE 表名 ( -- 页面勾选字段,字段从元数据直接拉取 id bigint, classify_code STRING, sorted int, classify_name STRING, is_deleted int, gmt_create timestamp(9), gmt_modified timestamp(9), PRIMARY KEY (id) NOT ENFORCED ) WITH ( -- 从选择的数据中获取 'connector' = 'mysql-x', 'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false', 'table-name' = 'datasource_classify', 'username' = 'root', 'password' = 'root' , -- 其它非主要配置有用户自己填写 ); 3、sql 历史版本管理,目前我已经提交 Feature 并被合并到 0.6.5 版本中。

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack/Taier

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/14893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(九)DateTime——PHP

文章目录第九章 Date & Time1 time()获取时间戳2 getDate()转换时间戳3 date()转换时间戳第九章 Date & Time 1 time()获取时间戳 time()函数返回的整数表示自1970年1月1日格林尼治标准时间午夜起经过的秒数。这一时刻称为UNIX历元&#xff0c;自那时起经过的秒钟数称…

Metabase学习教程:提问-1

创建交互式图表 可以通过使用查询生成器、构建模型或添加自定义目标来创建图表。供用户在Metabase中钻取数据。类似&#xff1a; 图1。放大特定类别和时间范围&#xff0c;然后查看构成图表上某个条形图的订单。 如果您只使用SQL编写过问题&#xff0c;那么您可能会忽略这样一…

颠覆IoT行业的开发神器!涂鸦智能重磅推出TuyaOS操作系统【程序员必备】

1 前言 作为降低 IoT 技术门槛的开发神器&#xff0c;TuyaOS 操作系统重磅发布 3.6.0 新版本啦&#xff01;针对设备安全、功耗、通信速率等关键功能&#xff0c;做了重大创新和优化升级。为了助力开发者更快速便捷地接入涂鸦IoT PaaS&#xff0c;并低门槛开发出有创意的智能单…

RabbitMQ初步到精通-第三章-RabbitMQ面板及环境搭建

第三章-RabbitMQ面板及环境搭建 1、RabbitMQ面板介绍 Rabbitmq安装完毕后&#xff0c;若是本地环境&#xff0c;打入&#xff1a;http://localhost:15672/#/ 进入到MQ的控制台页面中&#xff1a; 可以观察到此页面涉及的各个TAB&#xff0c;和我们前面介绍到的rabbitMQ架构中…

Docker入门学习笔记(狂神版)

下述笔记是自己花一天时间看B站狂神说Docker视频的笔记&#xff0c;下列的笔记是根据自己的实践的记录下来的&#xff0c;若想细学掌握Docker建议自行观看&#xff08;《Docker入门到精通》&#xff09;&#xff0c;去观看狂胜的视频记得三连支持一下。他的Docker讲解个人觉得是…

每日一个设计模式之【代理模式】

文章目录每日一个设计模式之【代理模式】☁️前言&#x1f389;&#x1f389;&#x1f389;&#x1f33b;模式概述&#x1f331;代理模式的实现&#x1f340;静态代理&#x1f340;动态代理&#x1f433;JDK代理&#x1f433;CGLib代理&#x1f340;拓展&#x1f433;虚拟代理&…

UE 5.1正式发布,有哪些值得一试的新功能?

UE 5.1正式发布&#xff0c;所以今天咱们就来看看最新版都具体更新和改进了哪些功能吧—— Nanite和Lumen Nanite和Lumen是UE 5.0版本更新的两个主要内容&#xff0c;UE 5.1则是对其进行进一步的改进。 Nanite添加了对双面材质和新的可编程光栅化程序的支持&#xff0c;可以通…

认识前端闭包

1、前言&#xff1a;&#xff08;先介绍一下函数的存储原理&#xff09; &#xff08;1&#xff09;基本函数存储原理&#xff1a; 首先我们定义一个函数&#xff0c;然后调用&#xff0c;如下&#xff1a; <script>function test(){let name yiyiconsole.log(name)}t…

未来的产品经理,需要什么样的原型设计工具?

我和原型工具的往事 作为一只工龄十年的产品汪&#xff0c;我一直在使用原型设计工具——摹客 &#xff0c;对于摹客可谓耳熟能详。 初次听说&#xff0c;是产品社群里的点赞安利&#xff1a;做移动端&#xff0c;用摹客。 当时正值2015年&#xff0c;移动端热到不行&#x…

【动态规划】字串中回文的个数

一、题目描述 给你一个字符串 s &#xff0c;请你统计并返回这个字符串中 回文子串 的数目。 注&#xff1a; 回文字符串 是正着读和倒过来读一样的字符串。子字符串 是字符串中的由连续字符组成的一个序列。具有不同开始位置或结束位置的子串&#xff0c;即使是由相同的字符…

Matlab论文插图绘制模板—水平三维柱状图(渐变)

在之前的文章中&#xff0c;分享了Matlab水平三维柱状图的绘制模板。 高度赋色的水平三维柱状图&#xff1a; 这次再来分享一下渐变赋色的水平三维柱状图的绘制模板。 先来看一下成品效果&#xff1a; 特别提示&#xff1a;Matlab论文插图绘制模板系列&#xff0c;旨在降低大家…

CellMarker 2.0 | 鼠标点一点就完成单细胞分析的完美工具~

1写在前面 本期我们介绍一下CellMarker 2.0上更新的6个网页工具&#xff0c;主要是用于scRNA-seq数据的分析与可视化。&#x1f970; 网址如下&#xff1a;&#x1f447; &#x1f4cd;http://bio-bigdata.hrbmu.edu.cn/CellMarker/index.html 2Single cell web tools 概览 作者…

“幂等”不等于“分布式锁”,也不得不考虑返回值

. 概览 在分布式系统中&#xff0c;幂等是一个非常重要的概念&#xff0c;常常与“重试”一起出现。当调用一个远程服务发生超时&#xff0c;调用方并不知道请求是否执行成功&#xff0c;这就是典型的“第三态”问题。对于这个问题最常见的解决方案便是进行主动重试&#xff0…

20道前端高频面试题(附答案)

setTimeout 模拟 setInterval 描述&#xff1a;使用setTimeout模拟实现setInterval的功能。 实现&#xff1a; const mySetInterval(fn, time) {let timer null;const interval () > {timer setTimeout(() > {fn(); // time 时间之后会执行真正的函数fninterval()…

2022年NPDP新版教材知识集锦--【第三章节】(4)

【敏捷开发】 8.1敏捷开发模型的定义 门径和敏捷方法的特点&#xff1a;门径流程适用于开发硬件产品&#xff0c;而敏捷方法适用于开发软件产品。这两种方法是相对独立的。敏捷方法和门径流程不是互相取代的关系。相反敏捷方法是一种有效的微观规划工具或项目管理工具&#x…

如何使用远程控制软件并将用途最大化?4款国内外优质应用测评解析

说起远控软件&#xff0c;大家的第一印象是什么&#xff1f;是能实现电脑控制电脑、手机平板控制电脑、或手机电脑控制另外手机的操作需求&#xff0c;还是TeamViewer、ToDesk、向日葵、微软桌面这类的产品名称&#xff1f; 在三年疫情下&#xff0c;我们或多或少都经历了居家…

VMware安装Centos

此教程版本使用的是 VMware 16 、Centos 7 虚拟机安装 Centos安装 注: win10下vmware 15 可能会有蓝屏现象 排查:自行检查是否安装有 KB4601319 补丁,如果有请卸载,或者安装 vmware 16 官网下载地址 跳转 控制面板 — 程序与功能 — 查看已安装的更新 虚拟机安装 打开vmware…

【快速上手系列】使用阿里云发送测试短信超简单教程

【快速上手系列】使用阿里云发送测试短信超简单教程 步骤 一、阿里云配置 1、进入阿里云首页点击短信服务 2、短信服务界面 3、点击快速学习&#xff0c;然后绑定测试手机号&#xff0c;绑定好后点击调用API发送短信 4、左侧可以看到一些参数设置&#xff0c;右面是可以选择…

基于Ryu 防火墙的检测和解决异常入侵的流量

基于Ryu 防火墙的检测和解决异常入侵的流量基于Ryu 防火墙的检测和解决异常入侵的流量防火墙规则实验仿真环节&#xff1a;1.下载代码到本地2.安装相关依赖库3.设置openflow1.34.启动控制器异常检测&#xff1a;异常解决&#xff1a;规则合并&#xff1a;防火墙规则树&#xff…

(一)EasyExcel的使用(读取数据到实体类即绑定实体类)

最近遇到了一个excel简单的导入导出的需求&#xff0c;因此就对easyexcel第三方插件的使用做一点总结&#xff0c;大家可以看一看&#xff0c;可能会对你有点帮助。 目录 前言&#xff1a; 1、引入easyexcel相关依赖 2、创建对应excel的实体类 3、导入excel&#xff0c;并…