Spark--一文了解SparkSql的Join策略
文章目录前言一、join 基本要素二、join 实现三、五种join 策略3.1 2 种数据分发模式数据怎么到同一个节点3.1.1 Broadcast Join广播 Join也叫 Map Join3.1.2 Shuffle Join重分区 Join也叫 Reduce Join3.2 3 种 Join 实现机制数据在本地怎么匹配3.2.1 Sort Merge Join排序合并 JoinSpark/Hive 默认的 Shuffle Join 实现3.2.2 Hash Join哈希 Join3.2.3 Nested Loop Join嵌套循环 Join3.3 详解五种join策略3.3.1 Broadcast Hash Join (BHJ)3.3.2 Shuffle Hash JoinSHJ3.3.3 Shuffle Sort Merge Join (SMJ)3.3.4 Cartesian product join3.3.5 Broadcast nested loop join (BNLJ)四、 Spark 如何选择 Join 策略五、其他问题5.1 为什么先判断是否等值连接5.2 join操作一定发生shuffle吗5.3 Spark如何决定是否使用广播连接Broadcast Join5.4 为什么我的表比广播阈值小但是没有走广播join5.5 Broadcast Join 一定比 Shuffle Join 快5.6 广播表的大小有限制吗5.7 spark shuffle 2.0以上已经不用hash shuffle了那join的时候还用hash join 么5.8 为什么是5种join策略而不是6种六、优化案例6.1 案例1-SortMergeJoin改写BroadcastHashJoin前言Join连接是大数据处理的重要手段它基于表之间的共同字段将来自两个或多个表的行结合起来。但是我们真的了解join吗join都有哪些是怎么实现的?一、join 基本要素包括三个要素Join方式、Join条件以及过滤条件Spark支持所有类型的Join7种包括inner joinleft outer joinright outer joinfull outer joincross joinleft semi join左半连接是 IN/EXISTS 子查询的一种更高效的实现特点右表只能在 on 子句中设置过滤条件不能在 where 子句过滤。select 结果只许出现左表的那些列只传递右表的 join key 给 map 阶段右表有重复值的情况下left semi join 只产生一条join 会产生多条遇到右表重复记录左表会跳过而 join 则会一直遍历主要用途:过滤右表记录替代性能较差的子查询可参考left anti join左反连接是not in/ not exists 子查询的一种更高效的实现null 值呢主要用途这里有一个优化案例二、join 实现Join的基本实现流程如下图所示Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter)通常streamIter为大表buildIter为小表我们不用担心哪个表为streamIter哪个表为buildIter这个spark会根据join语句自动帮我们完成。实现步骤1.基于streamIter来遍历每次取出streamIter中的一条记录rowA2.根据Join条件计算keyA3.根据该keyA去buildIter中查找所有满足Join条件(keyBkeyA)的记录rowBs4.将rowBs中每条记录分别与rowA join得到join后的记录5.根据过滤条件得到最终join的记录。三、五种join 策略在 Spark 的物理计划physical plan阶段Spark 的 JoinSelection 类会根据是否等值连接、Join hints 策略、Join 表的大小以及参与 Join 的 key 是否可以排序等条件来选择最终的 Join 策略join strategies最后 Spark 会利用选择好的 Join 策略执行最终的计算来自2种数据分发方式广播和Shuffle与3种Join实现机制Hash Joins、Sort Merge Joins和Nested Loop Joins的排列组合分发方式决定了数据怎么到同一个节点广播小表shuffle相同 Join Key分发到同一节点join实现机制决定了数据在本地怎么匹配Sort Merge Join排序合并)Hash Join小表构建哈希表Nested Loop Join嵌套循环 Join嵌套遍历小表没有Broadcast Sort Merge Join广播SMJ策略没必要本身被广播的数据集就比较小hash join 和 NLJ 完全够用了远比SMJ的机制的执行效率更高而且SMJ本身是针对大表关联大表的join算法。。当前 SparkApache Spark 3.0一共支持五种 Join 策略Broadcast hash join (BHJ)对应物理操作符BroadcastHashJoinExec性能最优分发方式广播Join实现机制Hash JoinsShuffle hash joinSHJ对应物理操作符ShuffledHashJoinExec性能次优分发方式ShuffleJoin实现机制Hash JoinsShuffle sort merge join (SMJ)对应物理操作符SortMergeJoinExec性能次优分发方式ShuffleJoin实现机制Sort Merge JoinsShuffle-and-replicate nested loop join又称笛卡尔积Cartesian product join(CPJ))对应物理操作符CartesianProductExec性能最差分发方式ShuffleJoin实现机制Nested Loop JoinsBroadcast nested loop join (BNLJ)对应物理操作符BroadcastNestedLoopJoinExec性能最差分发方式广播Join实现机制Nested Loop Joins其中 BHJ 和 SMJ 这两种 Join 策略是我们运行 Spark 作业最常见的。JoinSelection 的选择会先根据 Join 的 Key 为等值 Join来选择Broadcast hash join、Shuffle hash join 以及 Shuffle sort merge join中的一个如果 Join 的 Key 为不等值 Join或者没有指定 Join 条件则会选择Broadcast nested loop join或Shuffle-and-replicate nested loop join。不同的 Join 策略在执行上效率差别很大我们了解每种 Join 策略的执行过程和适用条件是很有必要的。Join策略Join实现类分发类型Join形式含义特点BHJBroadcastHashJoinExecbroadcasthashJoin(hash表)在小表构建hash表通过把小表广播的方式完成关联1、仅持等值连接join key不需要排序2、支持除了全外连接(full outer joins)之外的所有join类型3、Broadcast Hash Join相比其他的JOIN机制而言效率更高4、被广播的小表的数据量要小于spark.sql.autoBroadcastJoinThreshold值默认是10MB(10485760)对被广播的表构建Hash map如果数据比较大可能会造成OOM5、被广播表的大小阈值不能超过8GB总行数不能超过3.4亿6、基表不能被broadcast比如左连接时只能将右表进行广播SHJShuffledHashJoinExecshufflehashJoin(hash表)先Shuffle再构建hash表用Hash Join方式完成关联1、仅支持等值连接join key不需要排序2、支持除了全外连接(full outer joins)之外的所有join类型3、需要对小表构建Hash map如果构建Hash表的一侧数据比较大可能会造成OOMSMJSortMergeJoinExecshuffleSortMergeJoin(排序合并)先Shuffle再排序用Sort Merge Join方式完成关联1、仅支持等值连接2、支持所有join类型3、Join Keys需排序CPJCartesianProductExecShuffleNestedLoopJoin(嵌套循环)先Shuffle然后NLJ方式完成关联1、仅支持inner join2、支持等值和不等值连接 3、开启参数spark.sql.crossJoin.enabeBNLJBroadcastNestedLoopJoinExecbroadcastNestedLoopJoin(嵌套循环)将小表广播到所有Executors用NLJ方式完成关联1、支持等值和非等值连接2、支持所有的JOIN类型主要优化点如下:当右外连接时要广播左表当左外连接时要广播右表当内连接时要广播左右两张表3.12 种数据分发模式数据怎么到同一个节点3.1.1 Broadcast Join广播 Join也叫 Map Join核心原理把小表全量广播到所有 Executor 节点在每个 Executor 内存中存一份小表的完整副本大表完全不需要 Shuffle直接在本地读取自己的分片和小表的副本做本地 Join。适用场景一张表极小另一张表极大这是核心判断标准。小表能完全加载到 Executor 内存不会 OOM核心优劣势优势劣势1.完全避免大表的 Shuffle这是性能最高的 Join 方式没有之一大表不需要跨节点传输IO / 网络开销几乎为 02. 实现简单执行稳定1.小表不能太大否则会占用大量 Executor 内存甚至直接 OOM2. 小表更新时需要重新广播所有节点不适合小表频繁更新的场景3.1.2 Shuffle Join重分区 Join也叫 Reduce Join核心原理两张表都按 Join Key 做哈希重分区相同 Join Key 的数据会被发送到同一个 Executor 节点然后在每个 Executor 本地对两张表的对应分区做 Join。适用场景两张表都很大都≥GB 级甚至 TB/PB 级没有小表可以广播核心优劣势优势劣势1.能处理任意大的两张表是大表大表 Join 的唯一通用解2.支持所有 Join 类型Inner/Left/Right/Full Outer1.需要全量 Shuffle 两张表数据传输量大IO / 网络 / CPU 开销极高性能比 Broadcast Join 差很多2.容易出现数据倾斜导致单个 Task 运行时间极长甚至 OOM3.23 种 Join 实现机制数据在本地怎么匹配3.2.1 Sort Merge Join排序合并 JoinSpark/Hive 默认的 Shuffle Join 实现核心原理分两步执行排序阶段在每个 Executor 本地对两张表的对应分区按 Join Key 做排序合并阶段用双指针法遍历两张排序后的表同时移动两个指针匹配相同的 Join Key。适用场景两张表都很大无法走 Broadcast JoinJoin Key 是有序的或者数据已经按 Join Key 分桶 / 排序可以跳过排序阶段性能大幅提升数据分布比较均匀没有严重的数据倾斜核心优劣势优势劣势1.内存占用稳定不需要把整张表加载到内存只需要双指针遍历内存占用可控2.性能稳定适合大规模数据不会因为数据量突然变大而 OOM3.支持所有 Join 类型1.需要排序排序开销大尤其是数据量极大时2.不适合数据倾斜严重的场景倾斜的 Key 会导致单个分区数据量过大排序和合并都很慢3.2.2 Hash Join哈希 Join核心原理分两步执行构建哈希表阶段选择一张较小的表或分区 作为「构建表」全量加载到 Executor 内存构建一个哈希表Hash TableKey 是 Join KeyValue 是对应的数据行探测阶段遍历另一张「探测表或分区」用每一行的 Join Key 去哈希表里查找匹配找到就输出结果。适用场景一张表比较小能完全加载到内存通常和 Broadcast Join 组合使用叫「Broadcast Hash Join」数据分布比较均匀哈希冲突少不需要 Full Outer JoinHash Join 不支持 Full Outer核心优劣势优势劣势1.不需要排序性能比 Sort Merge Join 高尤其是小表大表的场景2.查找速度快哈希查找是 O (1) 的时间复杂度匹配效率极高1.内存占用高需要把构建表全量加载到内存构建表太大会 OOM2.不支持 Full Outer Join3.数据倾斜严重时性能暴跌倾斜的 Key 会导致哈希表膨胀甚至 OOM3.2.3 Nested Loop Join嵌套循环 Join核心原理最简单的 Join 算法分两步执行遍历外层表小表 的每一行对于外层表的每一行遍历内层表大表 的所有行匹配相同的 Join Key。适用场景两张表都非常小比如只有几千行、几万行内层表有索引比如数据库的 B 树索引或者数仓的分桶 / 排序键可以快速查找不用全表扫描核心优劣势优势劣势1. 实现简单2. 小表小表 Join 时性能不错3. 内层表有索引时查找速度快1.时间复杂度极高是 O (M*N)两张表稍微大一点比如几十万行性能就会指数级下降2.完全不适合大表 Join大表 Join 会跑几个小时甚至直接跑挂3.3 详解五种join策略3.3.1 Broadcast Hash Join (BHJ)又称 map-side-only join从名字可以看出Join 是在map端进行的。这种 Join 要求一张表很小小到足以将表的数据全部放到 Driver 和 Executor 端的内存中背后的思想是在driver端将小规模的数据集广播给每个executor的成本低于reshuffle或者重新排序所需的成本适用条件只能用于等值 Join不要求参与 Join 的 keys 可排序小表的数据必须小于广播阀值可以通过 spark.sql.autoBroadcastJoinThreshold 参数来配置默认是 10MB如果你的内存比较大可以将这个阈值适当加大如果将spark.sql.autoBroadcastJoinThreshold参数设置为-1可以关闭 BHJ除了 full outer joins 支持所有的 Join 类型。被广播表的大小阈值不能超过8GB总行数不能超过3.4亿实现将小表的数据广播broadcast到 Spark 所有的 Executor 端这个广播过程和我们自己去广播数据没什么区别先利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端然后在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端然后在 Executor 端这个广播出去的数据会和大表进行 Join 操作这种 Join 策略避免了 Shuffle 操作。一般而言Broadcast Hash Join 会比其他 Join 策略执行的要快但这个也不是一定的可以看下这篇文章在数据量一样的情况下如果 core 的个数比较多Shuffle Join 是有优势的如果非广播的表数据量数据量越来越大Broadcast Join 是有优势的如果加大广播表的数据量Driver 端的 Broadcast 是跑不出结果Executor 的 Broadcast Join 是比较快的。3.3.2 Shuffle Hash JoinSHJ大表和小表进行 Join 的时候选择的一种策略适用条件1.仅支持等值 Join不要求参与 Join 的 Keys 可排序2.spark.sql.join.preferSortMergeJoin参数必须设置为false参数是从 Spark 2.0.0 版本引入的默认值为 true会选择 Sort Merge Join3.至少有一个连接数据集需要小到足以建立一个hash table使得较小的数据集能够加载到内存中其大小应该小于广播阈值spark.sql.autoBroadcastJoinThreshold和shuffle分区数的乘积。4.同时较小的数据集需要比较大数据集至少小3倍否则基于排序的连接策略可能收益更大。实现把大表和小表按照相同的分区算法和分区数进行分区根据参与 Join 的 keys 进行分区这样就保证了 hash 值一样的数据都分发到同一个分区中然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 了。在进行 Join 之前还会对小表 hash 完的分区构建hash map。Shuffle hash join 利用了分治思想把大问题拆解成小问题去解决。3.3.3Shuffle Sort Merge Join (SMJ)参与 Join 的表都很大适用条件仅支持等值 Join参与 Join 的Keys可排序连接数据集没有大小限制。因为不需要将任何连接数据集装入内存这一点与Shuffle Hash Join策略连接不同我们可以对参与 Join 的表按照 Keys 进行 Bucket 来避免 Shuffle Sort Merge Join 的 Shuffle 操作因为 Bucket 的表事先已经按照 Keys 进行分区排序所以做 Shuffle Sort Merge Join 的时候就无需再进行分区和排序了。实现也是对两张表参与 Join 的Keys使用相同的分区算法和分区数进行分区目的就是保证相同的 Keys 都落到相同的分区里面。分区完之后再对每个分区按照参与 Join 的 Keys 进行排序最后 Reduce 端获取两张表相同分区的数据进行 Merge Join也就是 Keys 相同说明 Join 上了。3.3.4 Cartesian product join如果 Spark 中两张参与 Join 的表没指定ON 条件或者指定类似 on 11 那么会产生 Cartesian product join这个 Join 得到的结果其实就是两张行数的乘积。适用条件必须是inner Join其支持等值和不等值 Join。Cartesian product join 产生数据的行数是两表的乘积当 Join 的表很大时其效率是非常低下的所以我们尽量不要使用这种 Join3.3.5 Broadcast nested loop join (BNLJ)在某些情况会对某张表重复扫描多次效率非常低下。从名字可以看出BNLJ 会根据相关条件对小表进行广播以减少表的扫描次数。触发广播的需要满足以下三个条件之一right outer join 是会广播左表left outer, left semi, left anti 或者 existence join 时会广播右表inner join 的时候两张表都会广播。适用条件支持等值和不等值 Join支持所有的 Join 类型四、 Spark 如何选择 Join 策略在Spark SQL的物理计划阶段JoinSelection对象会根据是否等值连接、连接提示类型(hint)、连接的数据集大小以及参与连接的key是否可以排序等条件进行连接策略选取基于JoinSelection对象中选择连接策略的逻辑绘制以下流程图如果是等值 Join等值连接是一个在连接条件中只包含equals比较的连接所有连接运算符都支持等值连接用户是不是指定了 BROADCAST hint BROADCAST、BROADCASTJOIN 以及 MAPJOIN 中的一个如果指定了那就用 Broadcast Hash Join用户是不是指定了 SHUFFLE MERGE hint SHUFFLE_MERGE、MERGE 以及 MERGEJOIN 中的一个如果指定了那就用 Shuffle sort merge join用户是不是指定了 Shuffle Hash Join hint SHUFFLE_HASH如果指定了那就用 Shuffle Hash Join用户是不是指定了 shuffle-and-replicate nested loop join hint SHUFFLE_REPLICATE_NL如果指定了那就用 Cartesian product join如果用户没有指定任何 Join hint那根据 Join 的适用条件按照Broadcast Hash Join - Shuffle Hash Join - Sort Merge Join -Cartesian Product Join - Broadcast Nested Loop Join顺序选择 Join 策略如果是不等值 Join非等值连接包含除equals以外的任何比较例如, , , 由于非等值连接需要对不确定的值的范围进行比较因而嵌套循环是必须的对于非等值连接Spark SQL只支持Broadcast Nested Loop Join广播嵌套循环连接和Cartesian Product Join笛卡尔乘积连接。用户是不是指定了 BROADCAST hint BROADCAST、BROADCASTJOIN 以及 MAPJOIN 中的一个如果指定了那就广播对应的表并选择 Broadcast Nested Loop Join用户是不是指定了 shuffle-and-replicate nested loop join hint SHUFFLE_REPLICATE_NL如果指定了那就用 Cartesian product join如果用户没有指定任何 Join hint那根据 Join 的适用条件按照Broadcast Nested Loop Join -Cartesian Product Join - Broadcast Nested Loop Join顺序选择 Join 策略五、其他问题5.1 为什么先判断是否等值连接等值连接是一个在连接条件中只包含equals比较的连接所有连接运算符都支持等值连接非等值连接包含除equals以外的任何比较例如, , , 由于非等值连接需要对不确定的值的范围进行比较因而嵌套循环是必须的对于非等值连接Spark SQL只支持Broadcast Nested Loop Join广播嵌套循环连接和Cartesian Product Join笛卡尔乘积连接。5.2 join操作一定发生shuffle吗不一定。只有 SortMergeJoinExec 和 ShuffledHashJoinExec 这两种类型的join实现会发生shuffle详细可以看下这几种join的选择策略。5.3 Spark如何决定是否使用广播连接Broadcast Join表大小预估阈值对比Spark会预估表的大小默认使用统计信息中的行数和平均行长度计算若小表的预估大小小于spark.sql.autoBroadcastJoinThreshold参数默认10MB则可能触发广播连接。统计信息准确性若表的统计信息如行数、大小未更新或不准确可能导致预估错误进而影响广播决策。运行时动态优化AQE自适应查询执行在运行时AQE会根据实际数据分布动态调整Join策略。例如若发现某表实际大小远小于预估可能在运行时切换为广播连接。内存可用性广播连接需要将小表加载到内存中若Executor内存不足Spark可能回退到其他策略如Sort Merge Join。显式提示与配置SQL Hint使用/* BROADCASTJOIN */提示可强制Spark尝试广播连接但实际是否生效仍取决于表大小和内存条件。参数配置通过设置spark.sql.autoBroadcastJoinThreshold调整广播阈值或设置spark.sql.autoBroadcastJoinThreshold-1禁用自动广播。异常处理超时与重试若广播过程超时默认1800秒Spark会抛出异常并提示调整spark.sql.broadcastTimeout或spark.sql.broadcastMaxRetries参数或禁用广播连接。5.4 为什么我的表比广播阈值小但是没有走广播join原因参考5.3解决1提高预估准确性先将select查到的数据生成为中间临时表再去join起来可以正常估算表的大小case参见6.1解决2调大广播阈值5.5 Broadcast Join 一定比 Shuffle Join 快先给结论一般情况是但不是一定的可以看下这篇文章核心我们要了解下 Broadcast Join 的运行机制在进行 Broadcast Join 之前Spark 需要把处于 Executor 端的数据先发送到 Driver 端然后 Driver 端再把数据广播到 Executor 端。如果我们需要广播的数据比较多比如我们把 spark.sql.autoBroadcastJoinThreshold 这个参数设置到 1G如果我们的 Driver 端的内存值设置为 500M那这种情况下会导致 Driver 端出现 OOM。Driver端需要 collects广播“小表”的数据Driver 端构建hashtableDriver 把构建好的 hashtable发送到 Executor 端怎么提升 Broadcast Join 的性能针对上面的分析我们能不能不把数据 collect 到 Driver 端而直接在 Executor 端之间进行数据交换呢这就是 Workday 的工程师团队给我们带来的 Executor 端的 broadcast这项工作可以参见 SPARK-17556。我们来看看 Executor 端的 broadcast 工作原理Executors 把 Join 需要的数据 broadcasted 给其他 ExecutorsDriver 端只负责记录 Executors 端的 block 信息这样其他 Executor 就可以知道 block 可以从哪些 Executor 获取。总结在数据量一样的情况下如果 core 的个数比较多Shuffle Join 是有优势的如果非广播的表数据量数据量越来越大Broadcast Join 是有优势的如果加大广播表的数据量Driver 端的 Broadcast 是跑不出结果Executor 的 Broadcast Join 是比较快的。但是Executor 端的 Broadcast 特性是2016年9月就提的截止到最新的 Apache Spark 3.0.0 这个功能还没有合并到主分支5.6 广播表的大小有限制吗被广播表的大小阈值不能超过8GB总行数不能超过3.4亿注意如果广播表调整的很大对应的driver端内存也需要调大~5.7 spark shuffle 2.0以上已经不用hash shuffle了那join的时候还用hash join 么shuffle是一种数据分发的方式它的实现代表的是两个stage之间的数据按照什么方式移动join是发生在某个stage中hash join是指把小表构建成hash表和基表进关联的操作。hash shuffle被弃用了hash join在ShuffledHashJoinExec 和 BroadcastHashJoinExec这两种join的实现中还在使用。5.8 为什么是5种join策略而不是6种2种网络分发模式和3种Join实现方式明明应该有6种Join策略为什么没有Broadcast Sort Merge Join广播SMJ策略没必要本身被广播的数据集就比较小hash join 和 NLJ 完全够用了远比SMJ的机制的执行效率更高而且SMJ本身是针对大表关联大表的join算法。六、优化案例6.1 案例1-SortMergeJoin改写BroadcastHashJoin思路参见5.4优化效果20min → 51s优化前20min优化后39s12s前置处理 51s问题描述某天一个任务比平时慢了很多排查下来存在几个问题其中有一个就是走了不合理的join策略简化sqldroptableifexiststemp.temp_teble_06;createtabletemp.temp_teble_06asselecta.xxxx,b.xxx,c.xxxfromtemp.temp_teble_01 ajoin(selecta.*fromtemp.temp_teble_02 aleftjoin(selectidfrom(selectidfromtemp.temp_teble_04groupbyidunionallselectidfromtemp.temp_teble_05groupbyid)tgroupbyid)bona.idb.idwherea.xxxx2andb.idisnull)bona.column1b.column1anda.column2b.column2jointemp.temp_teble_03 conb.column1c.column1andb.column2c.column2andc.column31;其实从上面的截图我们可以看到temp.temp_teble_01扫描到了59847849条数据而第一次join子查询的结果只有116条数据但是竟然走了sortmergejoin然后变成了300215424条数据那么我们很自然的想到可以先把子查询提取出来作为临时表只有116条数据然后再join让其走BroadcastHashJoin改写sqldroptableifexiststemp.temp_teble_06_1;createtabletemp.temp_teble_06_1asselecta.*fromtemp.temp_teble_02 aleftjoin(selectidfrom(selectidfromtemp.temp_teble_04groupbyidunionallselectidfromtemp.temp_teble_05groupbyid)tgroupbyid)bona.idb.idwherea.xxxx2andb.idisnull;droptableifexiststemp.temp_teble_06;createtabletemp.temp_teble_06asselecttb1.xxxx,tb2.xxx,tb3.xxxfromtemp.temp_teble_01 tb1jointemp.temp_teble_06_1 tb2ontb1.column1tb2.column1andtb1.column2tb2.column2jointemp.temp_teble_03 tb3ontb2.column1tb3.column1andtb2.column2tb3.column2andtb3.column31;
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2451319.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!