《ShardingSphere解读》18 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(上)

news2026/3/23 11:58:55
在上一篇中我们对 ShardingGroupExecuteCallback 和 SQLExecuteTemplate 做了介绍。从设计上讲前者充当 ShardingExecuteEngine 的回调入口而后者则是一个模板类完成对 ShardingExecuteEngine 的封装并提供了对外的统一入口这些类都位于底层的 sharding-core-execute 工程中。从今天开始我们将进入到 sharding-jdbc-core 工程来看看 ShardingSphere 中执行引擎上层设计中的几个核心类。AbstractStatementExecutor如上图所示根据上一篇中的执行引擎整体结构图可以看到SQLExecuteTemplate的直接使用者是AbstractStatementExecutor 类今天我们就从这个类开始展开讨论该类的变量比较多我们先来看一下//数据库类型 private final DatabaseType databaseType; //JDBC中用于指定结果处理方式的 resultSetType private final int resultSetType; //JDBC中用于指定是否可对结果集进行修改的 resultSetConcurrency private final int resultSetConcurrency; //JDBC中用于指定事务提交或回滚后结果集是否仍然可用的 resultSetConcurrency private final int resultSetHoldability; //分片 Connection private final ShardingConnection connection; //用于数据准备的模板类 private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate; //SQL 执行模板类 private final SQLExecuteTemplate sqlExecuteTemplate; //JDBC的Connection列表 private final Collection connections new LinkedList(); //SQLStatement 上下文 private SQLStatementContext sqlStatementContext; //参数集 private final List- parameterSets new LinkedList(); //JDBC的Statement 列表 private final List statements new LinkedList(); //JDBC的ResultSet 列表 private final List resultSets new CopyOnWriteArrayList(); //ShardingExecuteGroup 列表 private final Collection executeGroups new LinkedList();从这个类开始我们会慢慢接触 JDBC 规范相关的对象因为 ShardingSphere 的设计目标是重写一套与目前的 JDBC 规范完全兼容的体系。这里我们看到的 Connection、Statement 和 ResultSet 等对象以及 resultSetType、resultSetConcurrency、resultSetHoldability 等参数都是属于 JDBC 规范中的内容我们在注释上做了特别的说明你对此也都比较熟悉。而像 ShardingSphere 自己封装的 ShardingConnection 对象也很重要我们已经在《03 | 规范兼容JDBC 规范与 ShardingSphere 是什么关系》中对这个类的实现方式以及如何兼容 JDBC 规范的详细过程做了介绍。在 AbstractStatementExecutor 中这些变量的展开会涉及很多 sharding-jdbc-core 代码工程关于数据库访问相关的类的介绍包括我们以前已经接触过的 ShardingStatement 和 ShardingPreparedStatement 等类所以我们在展开 AbstractStatementExecutor 类的具体实现方法之前需要对这些类有一定的了解。在 AbstractStatementExecutor 构造函数中我们发现了上一篇中介绍的执行引擎 ShardingExecuteEngine 的创建过程并通过它创建了 SQLExecuteTemplate 模板类相关代码如下所示public AbstractStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final ShardingConnection shardingConnection) { … ShardingExecuteEngine executeEngine connection.getRuntimeContext().getExecuteEngine(); sqlExecuteTemplate new SQLExecuteTemplate(executeEngine, connection.isHoldTransaction()); }同时AbstractStatementExecutor 中如下所示的 cacheStatements 方法也很有特色该方法会根据持有的 ShardingExecuteGroup 类分别填充 statements 和 parameterSets 这两个对象以供 AbstractStatementExecutor 的子类进行使用protected final void cacheStatements() { for (ShardingExecuteGroup each : executeGroups) { statements.addAll(Lists.transform(each.getInputs(), new Function() { Override public Statement apply(final StatementExecuteUnit input) { return input.getStatement(); } })); parameterSets.addAll(Lists.transform(each.getInputs(), new Function() { Override public List apply(final StatementExecuteUnit input) { return input.getRouteUnit().getSqlUnit().getParameters(); } })); } }注意这里在实现方式上使用了 Google 提供的 Guava 框架中的 Lists.transform 方法从而完成了不同对象之间的转换过程这种实现方式在 ShardingSphere 中应用广泛非常值得你学习。然后我们来看 AbstractStatementExecutor 中最核心的方法即执行回调的 executeCallback 方法protected final List executeCallback(final SQLExecuteCallback executeCallback) throws SQLException { List result sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback); refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext); return result; }显然在这里应该使用 SQLExecuteTemplate 模板类来完成具体回调的执行过程。同时我可以看到这里还有一个 refreshMetaDataIfNeeded 辅助方法用来刷选元数据。AbstractStatementExecutor 有两个实现类一个是普通的 StatementExecutor一个是 PreparedStatementExecutor接下来我将分别进行讲解。StatementExecutor我们来到 StatementExecutor先看它的用于执行初始化操作的 init 方法public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatementContext(routeResult.getSqlStatementContext()); getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); cacheStatements(); }这里的 cacheStatements 方法前面已经介绍过而 obtainExecuteGroups 方法用于获取所需的 ShardingExecuteGroup 集合。要实现这个方法就需要引入 SQLExecutePrepareTemplate 和对应的回调 SQLExecutePrepareCallback。1.SQLExecutePrepareCallback从命名上看让人感觉 SQLExecutePrepareTemplate 和 SQLExecuteTemplate 应该是一对尤其是名称中有一个“Prepare”让人联想到 PreparedStatement。但事实上SQLExecutePrepareTemplate 与 SQLExecuteTemplate 没有什么关联它也不是像 SQLExecuteTemplate 一样提供了 ShardingExecuteEngine 的封装而是主要关注于 ShardingExecuteGroup 数据的收集和拼装换句话说是为了准备Prepare数据。在 SQLExecutePrepareTemplate 中核心的功能就是下面这个方法该方法传入了一个 SQLExecutePrepareCallback 对象并返回 ShardingExecuteGroup 的一个集合public Collection getExecuteUnitGroups(final Collection routeUnits, final SQLExecutePrepareCallback callback) throws SQLException { return getSynchronizedExecuteUnitGroups(routeUnits, callback); }为了构建这个集合SQLExecutePrepareTemplate 实现了很多辅助方法同时它还引入了一个 SQLExecutePrepareCallback 回调来完成 ShardingExecuteGroup 数据结构中部分数据的填充。SQLExecutePrepareCallback 接口定义如下可以看到 Connection 和 StatementExecuteUnit 这两个对象是通过回调来创建的public interface SQLExecutePrepareCallback { //获取 Connection 列表 List getConnections(ConnectionMode connectionMode, String dataSourceName, int connectionSize) throws SQLException; //获取 Statement 执行单元 StatementExecuteUnit createStatementExecuteUnit(Connection connection, RouteUnit routeUnit, ConnectionMode connectionMode) throws SQLException; }当我们获取了想要的 ShardingExecuteGroup 之后相当于完成了 StatementExecutor 的初始化工作。该类中剩下的就是一系列以“execute”开头的 SQL 执行方法包括 executeQuery、executeUpdate以及它们的各种重载方法。我们先来看用于查询的 executeQuery 方法public List executeQuery() throws SQLException { final boolean isExceptionThrown ExecutorExceptionHandler.isExceptionThrown(); //创建 SQLExecuteCallback 并执行查询 SQLExecuteCallback executeCallback new SQLExecuteCallback(getDatabaseType(), isExceptionThrown) { Override protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(sql, statement, connectionMode); } }; //执行 SQLExecuteCallback 并返回结果 return executeCallback(executeCallback); }我们已经在上一篇中介绍过这个方法我们知道 SQLExecuteCallback 实现了 ShardingGroupExecuteCallback 接口并提供了 executeSQL 模板方法。而在上述 executeQuery 方法中executeSQL 模板方法的实现过程就是调用如下所示的 getQueryResult 方法private QueryResult getQueryResult(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { //通过 Statement 执行 SQL 并获取结果 ResultSet resultSet statement.executeQuery(sql); getResultSets().add(resultSet); //根据连接模式来确认构建结果 return ConnectionMode.MEMORY_STRICTLY connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet); }2.ConnectionModegetQueryResult 方法中完全基于 JDBC 中的 Statement 和 ResultSet 对象来执行查询并返回结果。但是这里也引入了 ShardingSphere 执行引擎中非常重要的一个概念即ConnectionMode连接模式它是一个枚举public enum ConnectionMode { MEMORY_STRICTLY, CONNECTION_STRICTLY }可以看到有两种具体的连接模式MEMORY_STRICTLY 和 CONNECTION_STRICTLY。MEMORY_STRICTLY 代表内存限制模式CONNECTION_STRICTLY 代表连接限制模式。ConnectionMode连接模式是 ShardingSphere 所提出的一个特有概念背后体现的是一种设计上的平衡思想。从数据库访问资源的角度来看一方面是对数据库连接资源的控制保护另一方面是采用更优的归并模式达到对中间件内存资源的节省如何处理好两者之间的关系是 ShardingSphere 执行引擎需求解决的问题。为此ShardingSphere 提出了连接模式的概念简单举例说明当采用内存限制模式时对于同一数据源如果有 10 张分表那么执行时会获取 10 个连接并进行并行执行而当采用连接限制模式时执行过程中只会获取 1 个连接而进行串行执行。那么这个 ConnectionMode 是怎么得出来的呢实际上这部分代码位于 SQLExecutePrepareTemplate 中我们根据 maxConnectionsSizePerQuery 这个配置项以及与每个数据库所需要执行的 SQL 数量进行比较然后得出具体的 ConnectionModeConnectionMode connectionMode maxConnectionsSizePerQuery type) throws SQLException; Object getCalendarValue(int columnIndex, Class type, Calendar calendar) throws SQLException; InputStream getInputStream(int columnIndex, String type) throws SQLException; boolean wasNull() throws SQLException; int getColumnCount() throws SQLException; String getColumnLabel(int columnIndex) throws SQLException; boolean isCaseSensitive(int columnIndex) throws SQLException; }如上图所示我们可以看到如果每个数据库连接所指向的 SQL 数多于一条时走的是内存限制模式反之走的是连接限制模式。3.StreamQueryResult VS MemoryQueryResult在了解了 ConnectionMode连接模式 的设计理念后我们再来看 StatementExecutor 的 executeQuery 方法返回的是一个 QueryResult。在 ShardingSphere中QueryResult 接口存在于 StreamQueryResult代表流式归并结果和 MemoryQueryResult 代表内存归并结果这两个实现类。ShardingSphere 采用这样的设计实际上跟前面介绍的 ConnectionMode 有直接关系。我们知道在内存限制模式中ShardingSphere 对一次操作所耗费的数据库连接数量不做限制而当采用连接限制模式时ShardingSphere严格控制对一次操作所耗费的数据库连接数量。基于这样的设计原理如上面的 ConnectionMode 的计算示意图所示在 maxConnectionSizePerQuery 允许的范围内当一个连接需要执行的请求数量大于 1 时意味着当前的数据库连接无法持有相应的数据结果集则必须采用内存归并反之则可以采用流式归并。StreamQueryResult我们通过对比 StreamQueryResult 和 MemoryQueryResult 的实现过程对上述原理做进一步分析在 StreamQueryResult 中它的 next 方法非常简单Override public boolean next() throws SQLException { return resultSet.next(); }显然这是一种流式处理的方式从 ResultSet 中获取下一个数据行。MemoryQueryResult我们再来看 MemoryQueryResult在它的构造函数中通过 getRows 方法把 ResultSet 中的全部数据行先进行获取并存储在内存变量 rows 中private Iterator getRows(final ResultSet resultSet) throws SQLException { Collection result new LinkedList(); while (resultSet.next()) { List rowData new ArrayList(resultSet.getMetaData().getColumnCount()); for (int columnIndex 1; columnIndex executeCallback new SQLExecuteCallback(getDatabaseType(), isExceptionThrown) { Override protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { //使用 Executor 进行执行 return executor.execute(statement, sql); } }; List result executeCallback(executeCallback); if (null result || result.isEmpty() || null result.get(0)) { return false; } return result.get(0); }这里多嵌套一层的目的是更好地分离代码的职责并对执行结果进行处理同样的处理技巧在 StatementExecutor 的 executeUpdate 方法中也有体现。PreparedStatementExecutor讲完 StatementExecutor 之后我们来看 PreparedStatementExecutor。PreparedStatementExecutor 包含了与 StatementExecutor 一样的用于初始化的 init 方法。然后我们同样来看它如下所示的 executeQuery 方法可以看到这里的处理方式与在 StatementExecutor 的一致public List executeQuery() throws SQLException { final boolean isExceptionThrown ExecutorExceptionHandler.isExceptionThrown(); //创建 SQLExecuteCallback 并执行 SQLExecuteCallback executeCallback new SQLExecuteCallback(getDatabaseType(), isExceptionThrown) { Override protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return getQueryResult(statement, connectionMode); } }; return executeCallback(executeCallback); }然后我们再来看它的 execute 方法就会发现有不同点public boolean execute() throws SQLException { boolean isExceptionThrown ExecutorExceptionHandler.isExceptionThrown(); SQLExecuteCallback executeCallback SQLExecuteCallbackFactory.getPreparedSQLExecuteCallback(getDatabaseType(), isExceptionThrown); List result executeCallback(executeCallback); if (null result || result.isEmpty() || null result.get(0)) { return false; } return result.get(0); }与 StatementExecutor 不同PreparedStatementExecutor 在实现 execute 方法时没有设计类似 Executor 这样的接口而是直接提供了一个工厂类 SQLExecuteCallbackFactorypublic final class SQLExecuteCallbackFactory { … public static SQLExecuteCallback getPreparedSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) { return new SQLExecuteCallback(databaseType, isExceptionThrown) { Override protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException { return ((PreparedStatement) statement).execute(); } }; } }注意到这里的静态方法 getPreparedSQLExecuteCallback 也就是返回了一个 SQLExecuteCallback 回调的实现而在这个实现中使用了 JDBC 底层的 PreparedStatement 完成具体 SQL 的执行过程。至此我们对 ShardingSphere 中两个主要执行器 StatementExecutor 和 PreparedStatementExecutor 都进行了详细介绍。从源码解析到日常开发本篇关于两种 QueryResult 的设计思想同样可以应用到日常开发中。当我们面对如何处理来自数据库或外部数据源的数据时可以根据需要设计流式访问方式和内存访问方式这两种访问方式在数据访问过程中都具有一定的代表性。通常我们会首先想到将所有访问到的数据存放在内存中再进行二次处理但这种处理方式会面临性能问题流式访问方式性能更高但需要我们挖掘适合的应用场景。小结与预告今天介绍了 ShardingSphere 执行引擎主题的第二个篇我们重点围绕执行引擎中的执行器展开讨论给出了 StatementExecutor 和 PreparedStatementExecutor 这两种执行器的实现方式也给出了 ShardingSphere 中关于连接模式的详细讨论。这里给大家留一道思考题ShardingSphere 中连接模式的概念和作用是什么欢迎你在留言区与大家讨论从类层结构而言StatementExecutor 和 PreparedStatementExecutor 都属于底层组件在下一篇我们会介绍包括 ShardingStatement 和 PreparedShardingStatement 在内的位于更加上层的执行引擎组件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2440289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…