flink sql 源码走读 --- 解释flink jdbc where 条件为什么没有下推数据库

news2025/7/20 17:30:51

本文通过一个具体案例,说明 flink sql 如何实现 connector 加载、source/sink 端操作、数据库连接等。可以帮助大家了解其原理,并在代码中找到落库执行SQL生成逻辑,得到where条件并没有下推到库执行的结论。

案例如下:

create table mysql_test_12 (
ID STRING,
NAME STRING,
primary key(ID) not enforced
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://${mysql_hosts}:${mysql_port}/sitrdw001?useSSL=false&useUnicode=true&characterEncoding=UTF-8',
'username' = '${mysql_username}',
'password' = '${mysql_pass}',
'scan.fetch-size'='1000',
'table-name' = 'test_12'
);

create table es_test_12 (
ID STRING,
NAME STRING,
primary key(ID) not enforced
) with (
'connector' = '${es_connector}',
'hosts' = '${es_hosts}',
'username' = '${es_username}',
'password' = '${es_pass}',
'index' = 'test_12'
);

insert into es_test_12
select
   *
from mysql_test_12
where ID = '20200604'
;

这是一个很简单的案例,source 端连接mysql数据库,sink 端连接 es,获取 ID = ‘20200604’ 的数据写入es。

一、整体概念

在这里插入图片描述
再具体展开之前,有必要熟悉一下flink sql的整体框架。
catalog:表目录的抽象,上面案例中的建表语句 create table mysql_test_12 … as … 就是一个catalog,主要包含库名、表名、列名、列数据类型等信息;

DynamicTableSourceFactory:每个connector 都会有一个固定的 factory 工厂,主要处理一些配置项(with 后面配置的选项,比如’scan.fetch-size’=‘1000’),做一些配置检查和封装工作,最终生成DynamicTableSource。

DynamicTableSource:数据源,在这里会创建可执行的 sql 语句,并生成 ScanRuntimeProvider 具体执行类。

ScanRuntimeProvider :sql执行的具体类,在这里执行 sql 查询,并提供数据查询/遍历 接口。

sink 跟source 比较类似,提供对外写出的能力,这里就不在展开。

二、创建 source 节点
案例中 ‘connector’ 配置的是 ‘jdbc’,那么 flink 是如何创建 jdbc 的source 节点呢?
在这里插入图片描述
CatalogSourceTable是创建source节点的入口类,可以看到这里创建了一个 JdbcDynamicTableSource 数据源,我们点击进去查看具体的实现方法,发现主要它主要调用的是 FactoryUtil 的 createTableSource 方法:
在这里插入图片描述从这段代码中 可以看出,flink先获取 DynamicTableSourceFactory,再调用factory.createDynamicTableSource(context) 方法得到具体的实现source。

关于factory的获取,感兴趣的可以继续debug深入了解,我简单概括一下主要逻辑:
1、系统加载 META-INF.services下所有继承 Factory 的类;
2、遍历每个factory,并调用factory的factoryIdentifier() 方法获取 标识 并进行匹配。比如 JdbcDynamicTableFactory 的 IDENTIFIER(标识符)是’jdbc’ ,刚好匹配上SQL中的connector。
3、找到 factory 后,调用 该factory的createDynamicTableSource() 方法返回source

至此,source节点创建完成。
在这里插入图片描述在这里插入图片描述
三、为什么 where 条件不支持下推数据库
想要了解where 条件有没有下推,我们需要去看SQL是如何创建的。DynamicTableSource(jdbc的实现类是JdbcDynamicTableSource)负责构建SQL,核心代码如下:

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        // 用来执行 SQL的具体对象
        final JdbcRowDataInputFormat.Builder builder =
                JdbcRowDataInputFormat.builder()
                        .setDrivername(options.getDriverName())
                        .setDBUrl(options.getDbURL())
                        .setUsername(options.getUsername().orElse(null))
                        .setPassword(options.getPassword().orElse(null))
                        .setAutoCommit(readOptions.getAutoCommit());

        // 设置 fetch-size
        if (readOptions.getFetchSize() != 0) { 
            builder.setFetchSize(readOptions.getFetchSize());
        }
        final JdbcDialect dialect = options.getDialect();
        
        // 通过schema 生成 select 语句。对照案例,query = "SELECT `ID`, `NAME` FROM `test_12`"
        String query =
                dialect.getSelectFromStatement(
                        options.getTableName(), physicalSchema.getFieldNames(), new String[0]);
        // 如果设置了分区扫描,在sql 后面拼接 where {scan.partition.column} BETWEEN ? AND ?
        if (readOptions.getPartitionColumnName().isPresent()) {
            long lowerBound = readOptions.getPartitionLowerBound().get();
            long upperBound = readOptions.getPartitionUpperBound().get();
            int numPartitions = readOptions.getNumPartitions().get();
            builder.setParametersProvider(
                    new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
                            .ofBatchNum(numPartitions));
            query +=
                    " WHERE "
                            + dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
                            + " BETWEEN ? AND ?";
        }
        // 设置limit
        if (limit >= 0) {
            query = String.format("%s %s", query, dialect.getLimitClause(limit));
        }
        builder.setQuery(query);
        final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
        builder.setRowConverter(dialect.getRowConverter(rowType));
        builder.setRowDataTypeInfo(
                runtimeProviderContext.createTypeInformation(physicalSchema.toRowDataType()));

        return InputFormatProvider.of(builder.build());
    }

通过上面的代码逻辑,不难看出,SQL 主要是根据 schema和 scan.partition来生成的,并没有拼接 where ID = ‘20200604’。where操作应该是在内存中通过 filter 算子进行过滤。

由此可能造成的问题是,即使只需要处理一条数据,flink sql 也会把 test_12 所有的数据加载到内存中,如果遇到大表,会造成处理性能下降的后果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/395353.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Zookeeper集群和Hadoop集群安装(保姆级教程)

1. HA HA(Heigh Available)高可用 解决单点故障,保证企业服务 7*24 小时不宕机单点故障:某个节点宕机导致整个集群的宕机 Hadoop 的 HA NameNode 存在单点故障的可能,需要配置 HA 解决引入第二个 NameNode 作为备份同…

C语言学习(三)

#include <stdio.h> int main(void){int a; scanf("%d",&a); printf("%d",a); return 0&#xff1b;} 正在上传…重新上传取消正在上传…重新上传取消&符号作用是把键盘中输入的值给变量a,使用scanf()时输入数值&#xff0c;需要按一下enter…

MySQL知识点全面总结2:Mysql语句的执行(DDL DML 完整性约束)

二.MySQL知识点全面总结2&#xff1a;mysql语句的执行&#xff08;DDL DML 完整性约束&#xff09; 1.对数据库的操作&#xff08;DDL&#xff09; 2.对数据表的操作&#xff08;DDL&#xff09; 3.对数据表字段的操作&#xff08;DDL&#xff09; 4.对数据表内容的增删改&…

Science Direct 高级检索、使用技巧、寻找目标期刊、E lsevi er 投稿指南

ScienceDirect使用教程&E ls evier投稿指南1. 高级检索功能1.1 布尔值运算符和短语搜索&#xff08;构建检索式&#xff09;1.2 搜索特殊字符和公式1.3 非搜索用词2. 如何使用搜索结果页面&#xff1f;3. 下载多个文档3.1 从搜索结果列表中下载多个文档3. 利用 JournalFind…

java高性能并发计数器之巅峰对决

并发计数器各个方案介绍方案概述1. jdk5提供的原子更新长整型类 AtomicLong2. synchronized3. jdk8提供的 LongAdder 【单机推荐】4. Redisson分布式累加器【分布式推荐】方案介绍jdk5提供的原子更新长整型类 AtomicLong在JDK1.5开始就新增了并发的Integer/Long的操作工具类Ato…

VS2022+qt5.14.2配置qcustomplot2.1.16(坑很多)

目录1. 前言2. 下载QCustomPlot的源码3. 将QCustomPlot添加到QT项目&#xff08;一些遇到的问题&#xff09;1. 前言 QCustomPlot 是一个超强超小巧的qt绘图类&#xff0c;非常漂亮&#xff0c;非常易用&#xff0c;只需要加入一个qcustomplot.h和qcustomplot.cpp文件即可使用…

Android 基础知识4-3.7 ProgressBar(进度条)详解

一、简介 ProgressBar&#xff08;进度条&#xff09;是UI界面中一种非常实用的组件&#xff0c;通常用于向用户像是某个耗时操作完成的百分比。进度条可动态地显示进度&#xff0c;因此避免长时间地执行某个操作时&#xff0c;让用户感觉程序失去了响应&#xff0c;从而更好地…

常用反弹 shell 方式总结

在对 Linux 系统进行渗透测试中&#xff0c;常常会用到反弹 shell 的操作&#xff0c;今天来总结一下常用反弹 shell 的方式。 什么是反弹shell&#xff1a; 反弹shell&#xff0c;就是攻击机监听在某个TCP/UDP端口为服务端&#xff0c;目标机主动发起请求到攻击机监听的端口…

Spark 配置项

Spark 配置项硬件资源类CPU内存堆外内User Memory/Spark 可用内存Execution/Storage Memory磁盘ShuffleSpark SQLJoin 策略调整自动分区合并自动倾斜处理配置项分为 3 类: 硬件资源类 : 与 CPU、内存、磁盘有关的配置项Shuffle 类 : Shuffle 计算过程的配置项Spark SQL : Spar…

MySQL 中的 distinct 和 group by 的性能比较

1 先说大致的结论&#xff08;完整结论在文末&#xff09;在语义相同&#xff0c;有索引的情况下&#xff1a;group by和 distinct 都能使用索引&#xff0c;效率相同。在语义相同&#xff0c;无索引的情况下&#xff1a;distinct 效率高于group by。原因是 distinct 和 group …

【软件开发】基于PyQt5开发的标注软件

这里是基于PyQt5写的面向目标检测的各类标注PC端软件系统。目前现有的labelme软件和labelImg开源软件无法满足特殊数据集的标注要求&#xff0c;而且没有标注顺序的报错提示。当然我设计的软件就会不具有适用性了&#xff08;毕竟从下面开发的软件可以明显看出来我做的基本上是…

spark-submit报错

spark-submit --class ${main_class} \ --master yarn \ --deploy-mode client \ --driver-memory 8g \ --executor-memory 6g \ --num-executors 1 \ --executor-cores 34 \ ${HOME_PATH}/obs_finance-1.0-SNAPSHOT-jar-with-dependencies.jar ${year_month} 注意这个34超过…

【教学典型案例】28.单表的11个Update接口--MyBatis

目录一&#xff1a;背景介绍二&#xff1a;前期准备引入pom依赖MyBatis配置文件数据库连接文件MyBatis配置类三&#xff1a;代码编写Mapper编写接口通用mapper实体pojojunit测试编写测试结果四&#xff1a;总结一&#xff1a;背景介绍 在进行项目开发编写更新接口时&#xff0…

顶级动漫IP加持之下,3A策略游戏Mechaverse如何改变GameFi

2021年是元宇宙发展的元年&#xff0c;元宇宙与GameFi创造了一波又一波市场热点。在经历第一波热潮之后&#xff0c;元宇宙的到来让不少人看到了加密市场的潜力&#xff0c;同时大家也意识到这将是未来的重要方向。如何将元宇宙推向更广阔的市场&#xff0c;让更多人能够轻松进…

项目管理工具DHTMLX Gantt灯箱元素配置教程:显示任务内容

DHTMLX Gantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表。可满足项目管理应用程序的大部分开发需求&#xff0c;具备完善的甘特图图表库&#xff0c;功能强大&#xff0c;价格便宜&#xff0c;提供丰富而灵活的JavaScript API接口&#xff0c;与各种服务器端技术&am…

神经网络(容易被忽视的基础知识)

主要内容&#xff1a; 基本神经元作为线性分类器的单个神经元为什么要是深度神经网而不是”肥胖“神经网络&#xff1f;为什么在人工神经网络中的神经元需要激活函数&#xff1f;一些主要的激活函数神经网络中的偏置有什么意义&#xff1f;初始化神经网络的参数可以全为0吗&am…

【java】java基本类型和包装类型的区别

文章目录简介1.包装类型可以为 null&#xff0c;而基本类型不可以2.包装类型可用于泛型&#xff0c;而基本类型不可以3.基本类型比包装类型更高效4.自动装箱和自动拆箱简介 Java 的每个基本类型都对应了一个包装类型&#xff0c;比如说 int 的包装类型为 Integer&#xff0c;d…

ECOLOY直接更换流程表单后导致历史流程中数据为空白的解决方案

用户反馈流历史流程打开是空白了没有内容。 一、问题调查分析&#xff1a; 工作流“XX0204 员工培训协议审批流程”workflowId37166产生的7个具体流程中&#xff0c;创建日期为2021年的4个具体流程原先引用的数据库表单应该是“劳动合同签订审批表”(formtable_main_190)&…

JavaScript-缓存

参考资料彻底解决让用户清一下浏览器缓存浏览器缓存彻底理解浏览器的缓存机制彻底弄懂前端缓存浅解强缓存和协商缓存浏览器缓存策略(强缓存和协商缓存)一文搞定Http缓存-强制缓存与协商缓存前端浏览器缓存知识梳理ASP.NET Core 中使用缓存IIS中设置Cache-Control是什么当我们第…

[Gin]框架底层实现理解(一)

前言&#xff1a;路由原理———压缩字典 这边简单讲一下gin非常重要的一个基点&#xff0c;也就是他作为go web框架的一个亮点 也就是Trie树和压缩字典算法 gin 通过树来存储路由&#xff0c;讲路由的字符拆解为一个个的结点&#xff0c;在获取handler函数时&#xff0c;会…