Spark Join大大表

news2025/8/7 16:59:22

Spark Join大大表

  • 分而治之
    • 拆分内表
    • 外表的重复扫描
    • 案例
  • 负隅顽抗
    • 数据分布均匀
    • 数据倾斜
      • Task 数据倾斜
      • Executor 数据倾斜
      • 两阶段 Shuffle
      • Executors 调优案例

Join 大大表 :

  • Join 的两张体量较大的事实表,尺寸相差在 3 倍内,且无法广播变量
  • 用大表 Join 大表才能实现业务逻辑,说明 : 数据仓库在设计初时,考虑不够完善
  • 大表 Join 大表的调优思路:分而治之/ 负隅顽抗

分而治之

分而治之的调优思路:把复杂任务拆解成多个简单任务,再合并多个简单任务的计算结果

分而治之的计算过程:

  1. 根据两表的大小区分:外表/ 内表(较小)
  2. 对内表进行过滤,并把内表划分为多个不重复的完整子集
  3. 把外表和这些子集做关联,得到部分计算结果
  4. 最后用 Union 把所有结果合并到一起,得到完整的计算结果

在这里插入图片描述

拆分内表

内表拆分:要求每个子表的尺寸相对均匀, 且都小到进行广播变量

拆分的关键 : 选取的列,要让子表足够小 :

  • 基数低 : 内表的拆分列是性别(男/女),性别基数是 2。这拆出来的子表还是很大,远远超出广播阈值
  • 基数大 : 拆分:身份证号。缺点:不易拆分,开发成本高 ; 过滤条件很难触发优化机制(谓词下推)
  • 拆分:时间。一般事实表都与时间相关。既能享受分区剪裁(Partition Pruning),又降低开发成本

外表的重复扫描

外表的重复扫描 :

  • 内表拆分后,外表会和所有子表做关联,但每次关联都要重新扫描外表的全量数据
  • 外表扫描的次数 = 内表拆分的份数

在这里插入图片描述

解决数据重复扫描:

  • Cache,要求资源非常吊
  • 用 DPP 机制,对外表进行分区过滤

DPP 机制:

  • 每个子查询只扫描外表的子集,把所有子集加起来,就是外表的全量数据

在这里插入图片描述

案例

orders 和 transactions 都是事实表,都是 TB 级别 :

//orders 订单表
orderId: Int
customerId: Int
status: String
date: Date //分区键

//lineitems 交易明细表
orderId: Int //分区键
txId: Int
itemId: Int
price: Float
quantity: Int

每隔一段时间 ,计算上个季度所有订单的交易额 :

val query: String = "
  select sum(tx.price * tx.quantity) as revenue, 
  	o.orderId
  from transactions as tx 
  	inner join orders as o on tx.orderId = o.orderId
  where o.status = 'COMPLETE'
  	and o.date between '2020-01-01' and '2020-03-31'
  group by o.orderId
"

transactions 是外表,orders 是内表(较小)

  • 对 date 字段进行以天拆分
//以date字段拆分内表
val query: String = "
  select sum(tx.price * tx.quantity) as revenue, 
  	o.orderId
  from transactions as tx 
  	inner join orders as o on tx.orderId = o.orderId
  where o.status = 'COMPLETE'
  	and o.date = '2020-01-01'
  group by o.orderId
"

内表拆分后,外表与所有子表做关联,把全部子关联的结果合并

//循环遍历 dates
val dates: Seq[String] = Seq("2020-01-01", "2020-01-02",..."2020-03-31")

for (date <- dates) {
  val query: String = s"
    select sum(tx.price * tx.quantity) as revenue, 
  		o.orderId
    from transactions as tx inner 
  		join orders as o on tx.orderId = o.orderId
    where o.status = 'COMPLETE'
    	and o.date = ${date}
    group by o.orderId
  " 
  val file: String = s"${outFile}/${date}"
  spark.sql(query).save.parquet(file)
}

负隅顽抗

负隅顽抗 : 当内表没法均匀拆分,或外表没有分区键,不能利用 DPP,只能依赖 Shuffle Join,来完成 Join 大大表

数据分布均匀

默认 Shuffle Sort Merge Join 转为 Shuffle Hash Join 条件:

  • 两表数据分布均匀
  • 内表所有数据分片,能放入内存

每个数据分片的切分 :

  • 根据并发度/执行内存,计算每个 Task 消耗的内存上下限
  • 结合分布式数据集尺寸与上下限,计算出并行度

利用 Join Hints 选择 Shuffle Hash Join


select /*+ shuffle_hash(orders) */ 
	sum(tx.price * tx.quantity) as revenue, 
	o.orderId
from transactions as tx inner 
	join orders as o on tx.orderId = o.orderId
where o.status = 'COMPLETE'
	and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId

数据倾斜

Join 大大表数据倾斜情况 :

在这里插入图片描述

Task 数据倾斜

利用 AQE 解决自动倾斜处理。配置参数 :

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor : 判定倾斜的膨胀系数
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes : 判定倾斜的最低阈值
  • spark.sql.adaptive.advisoryPartitionSizeInBytes : 定义拆分粒度 (字节)

AQE 自动倾斜处理 :

  • 外表的倾斜分区,以 spark.sql.adaptive.advisoryPartitionSizeInBytes 把倾斜分区拆分为多个数据分区
  • 对内表的数据分区进行复制

在这里插入图片描述

Task 的负载均衡 :

  • AQE 只能处理 Task 倾斜,Executors 的负载倾斜并没有改善

在这里插入图片描述

Executor 数据倾斜

解决 Executors 的数据倾斜的方法 :分而治之/ 两阶段 Shuffle

分而治之 :

  • 对外表中所有 Join Keys,按照倾斜分为两组 (倾斜的 Join Keys / 分布均匀的 Join Keys)
  • 按照两组 Join Keys,对内表分为两份
  • 对内外表的两组数据,分别用不同方法做关联计算
  • 分布均匀的数据,把 Shuffle Sort Merge Join 转为 Shuffle Hash Join
  • 倾斜数据,用两阶段 Shuffle,平衡 Executors 之间的工作负载
  • 再把两个关联结果集进行 Union

在这里插入图片描述

两阶段 Shuffle

两阶段 Shuffle:

  • 第一阶段 :目的将数据打散、平衡计算负载,通过 加盐、Shuffle、关联、聚合
  • 第二阶段 :计算结果,通过 去盐化、Shuffle、聚合
  • 不破坏原有关联关系下,平衡 Executors 之间的计算负载

在这里插入图片描述

第一阶段:对倾斜 Join Keys 加盐 (粒度 : Executors 总数)

  • 外表/内表做不同加盐处理

对外表进行随机加盐 :

  • 对倾斜的 Join Key,都 + 随机后缀 (1 - #N)

在这里插入图片描述

内表进行复制加盐 :

  • 对倾斜的 Join Key,把原数据复制 (Executors 总数 – 1),得到 (Executors 总数) 份数据副本
  • 对每份副本,按 Join Key +固定后缀 (1 - #N) ,与打散后的外表数据保持一致

在这里插入图片描述

第二阶段 :

  • 把每个 Join Key 的后缀去掉 (去盐化)
  • 按照原来的 Join Key 进行 Shuffle 和聚合计算,得出倾斜组的计算结果
  • 将倾斜的结果和均匀的结果进行合并

在这里插入图片描述

Executors 调优案例

orders 和 transactions 都 TB 级别的事实表,计算上个季度所有订单的交易额

//统计订单交易额的代码实现
val txFile: String = _
val orderFile: String = _

val transactions: DataFrame = spark.read.parquent(txFile)
val orders: DataFrame = spark.read.parquent(orderFile)

transactions.createOrReplaceTempView("transactions")
orders.createOrReplaceTempView(“orders”)

val query: String = "
  select sum(tx.price * tx.quantity) as revenue, 
		o.orderId
  from transactions as tx 
		inner join orders as o on tx.orderId = o.orderId
  where o.status = 'COMPLETE'
  	and o.date between '2020-01-01' and '2020-03-31'
  group by o.orderId
"

val outFile: String = _
spark.sql(query).save.parquet(outFile)

把倾斜的 orderId 保存在数组 skewOrderIds 中,把均匀的 orderId 保持在数组 evenOrderIds 中

//根据Join Keys是否倾斜、将内外表分别拆分为两部分
import org.apache.spark.sql.functions.array_contains

//将Join Keys分为两组,存在倾斜的、和分布均匀的
val skewOrderIds: Array[Int] = _
val evenOrderIds: Array[Int] = _

val skewTx: DataFrame = 
	transactions.filter(array_contains(lit(skewOrderIds), $"orderId")
val evenTx: DataFrame = 
	transactions.filter(array_contains(lit(evenOrderIds), $"orderId")

val skewOrders: DataFrame = 
	orders.filter(array_contains(lit(skewOrderIds), $"orderId"))

val evenOrders: DataFrame = 
	orders.filter(array_contains(lit(evenOrderIds), $"orderId"))

对均匀数据,转为 Shuffle Hash Join:

//将分布均匀的数据分别注册为临时表
evenTx.createOrReplaceTempView("evenTx")
evenOrders.createOrReplaceTempView("evenOrders")

val evenQuery: String = "
  select /*+ shuffle_hash(orders) */ 
		sum(tx.price * tx.quantity) as revenue, 
		o.orderId
  from evenTx as tx 
		inner join evenOrders as o on tx.orderId = o.orderId
  where o.status = 'COMPLETE'
  	and o.date between '2020-01-01' and '2020-03-31'
  group by o.orderId
"
val evenResults: DataFrame = spark.sql(evenQuery)

对外表做随机加盐,对内表做复制加盐

import org.apache.spark.sql.functions.udf

//定义获取随机盐粒的UDF
val numExecutors: Int = _
val rand = () => scala.util.Random.nextInt(numExecutors)
val randUdf = udf(rand)

//第一阶段的加盐。注意:保留 orderId 字段,用于二阶段的去盐化
//外表随机加盐
val saltedSkewTx = 
	skewTx.withColumn("joinKey", concat($"orderId", lit("_"), randUdf()))

//内表复制加盐
var saltedskewOrders = 
	skewOrders.withColumn("joinKey", concat($"orderId", lit("_"), lit(1)))
for (i <- 2 to numExecutors) {
  saltedskewOrders = saltedskewOrders union skewOrders.withColumn("joinKey", concat($"orderId", lit("_"), lit(i)))
}

对加盐的两张表,进行查询 :

//将加盐后的数据分别注册为临时表
saltedSkewTx.createOrReplaceTempView(“saltedSkewTx”)
saltedskewOrders.createOrReplaceTempView(“saltedskewOrders”)

val skewQuery: String = "
  select /*+ shuffle_hash(orders) */ 
		sum(tx.price * tx.quantity) as initialReven,
  	o.orderId, 
		o.joinKey
  from saltedSkewTx as tx 
		inner join saltedskewOrders as o on tx.joinKey = o.joinKey
  where o.status = 'COMPLETE'
  	and o.date between '2020-01-01' and '2020-03-31'
  group by o.joinKey
"

//第一阶段: 加盐、Shuffle、关联、聚合后的初步结果
val skewInitialResults: DataFrame = spark.sql(skewQuery)

去盐化目的 :把计算的粒度,从加盐 joinKey 恢复为原来的 orderId

  • 只要在 orderId 上进行聚合,就能去盐化
val skewResults: DataFrame = 
  skewInitialResults.select("initialRevenue", "orderId")
  .groupBy(col("orderId"))
  .agg(sum(col("initialRevenue")).alias("revenue"))

把倾斜结果和均匀结果进行合并,就能平衡 Executors 计算负载

evenResults union skewResults

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/394017.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

观点丨Fortinet谈ChatGPT火爆引发的网络安全行业剧变

FortiGuard报告安全趋势明确指出“网络攻击者已经开始尝试AI手段”&#xff0c;ChatGPT的火爆之际的猜测、探索和事实正在成为这一论断的佐证。攻守之道在AI元素的加持下也在悄然发生剧变。Fortinet认为在攻击者利用ChatGPT等AI手段进行攻击的无数可能性的本质&#xff0c;其实…

动环监控4大应用价值,这个价值最大

机房管理&#xff0c;已成为企业管理和发展的首要任务。为提升机房管理的效率&#xff0c;应从空间环境、设备性能等多角度&#xff0c;对机房环境监测进行实时监控掌握相关数据参数&#xff0c;确保故障隐患及时发现&#xff0c;提高机房整体管理水平&#xff0c;降低运维难度…

Hadoop三大框架之HDFS

一、概述HDFS产生的背景及定义HDFS产生背景随着数据量越来越大&#xff0c;在一个操作系统存不下所有的数据&#xff0c;那么就分配到更多的操作系统管理的磁盘中&#xff0c;但是不方便管理和维护&#xff0c;需要一种系统来管理多台机器上的文件&#xff0c;这就是分布式文件…

一篇文章带你了解折线图

在我们的日常生活中&#xff0c;折线图随处可见&#xff0c;如医院里的心电图、股市里的股票走势、企业中的财务报表分析&#xff0c;但是折线图的具体适用情形&#xff0c;你真的了解吗&#xff1f; 折线图&#xff08;line chart&#xff09;也叫曲线图&#xff0c;用于显示数…

javaWeb在线考试系统

一、项目简介 本项目是一套javaWeb在线考试系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse 确保…

python-pandapower电力系统状态估计(算例1:讲解以及基本算例实现)

提示:专栏解锁后可以查看该专栏所有文章划算 全文截图如下 文章目录 前言一、状态估计1 理论背景2 测量量3 标准偏差二、工具箱函数讲解1 定义测量create_measurement()2 运行状态估计estimate( )3 不良数据处理remove_bad_data();chi2_analysis()remove_bad_data()…

k8s调度之初探nodeSelector和nodeAffinity

在k8s的调度中&#xff0c;有强制性的nodeSelector&#xff0c;节点亲和性nodeAffinity、Pod亲和性podAffinity、pod反亲和性podAntiAffinity。本篇先对nodeSelector和nodeAffinity做个初探。进入主题之前&#xff0c;先看看创建pod的大概过程kubectl向apiserver发起创建pod请求…

【UE4 RTS游戏】01-项目准备

步骤新建一个工程&#xff0c;选择俯视角游戏模板我命名工程如下&#xff1a;删除场景内的所有cube再删除Floor和Wall删除TopDownCharacter删除“NavgationMeshBoundVolume”删除“TamplateLabel”和“RecastNavMesh-Default”删除LightmassImportanceVolume、PostProcessVolum…

【java】java异常分类和异常处理以及面试中常见的问题

文章目录什么是异常&#xff1f;程序错误一般分为三种编译错误运行时错误逻辑错误两个子类区别java几种常见的异常&#xff1a;运行时异常&#xff1a;IOException异常的产生&#xff1a;异常的处理&#xff1a;消极的处理&#xff1a;积极的处理&#xff1a;(异常捕获)throw和…

Jenkins自动化部署入门

Jenkins自动化部署入门 一、简介 Jenkins是一个开源软件项目&#xff0c;是基于Java开发的一种持续集成工具&#xff0c;用于监控持续重复的工作&#xff0c;旨在提供一个开放易用的软件平台&#xff0c;使软件的持续集成变成可能。 Jenkins自动化部署实现原理 二、Jenkins部…

交互:可以执行命令行的框架才是好框架

上一节课&#xff0c;我们开始把框架向工业级迭代&#xff0c;重新规划了目录&#xff0c;这一节课将对框架做更大的改动&#xff0c;让框架支持命令行工具。 第三方命令行工具库 cobra obra 不仅仅能让我们快速构建一个命令行&#xff0c;它更大的优势是能更快地组织起有许多…

SpringBoot整合Mybatis详解

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言一、创建项目&#xff0c;导入依赖&#xff0c;完善项目结构二、编码1.yml配置2.编写实体类3.编写mapper.xml和接口4.编写业务层5.编写控制层6.启动类加上包扫描…

实践分享:Vue 项目如何迁移小程序

最近我们小组刚经历了将成熟的 HTML5 项目转换成小程序&#xff0c;并在app中运行的操作&#xff01;记录下来分享给各位。 项目&#xff1a;将已有的 Vue 项目转为小程序&#xff0c; 在集成了FinClip SDK 的 App 中运行。 技术&#xff1a;uni-app、FinClip 两个注意事项&…

多机器人集群网络通信协议分析

本文讨论的是多机器人网络通信各层的情况和协议。 每个机器人连接一个数据传输通信模块&#xff08;以下简称为数传&#xff0c;也泛指市面上的图传或图数一体的通信模块&#xff09;&#xff0c;数传之间进行组网来传递信息。 根据ISO的划分&#xff0c;网络通信的OSI模型分…

速看|快速软件开发框架突破信息孤岛,高效实现数字化发展!

在企业办公自动化发展的过程中&#xff0c;各部门之间的信息链接存在着链接不及时、信息孤岛的现象。伴随着日益激烈的市场竞争&#xff0c;这样单枪匹马的作战方式已经让不少企业吃尽了苦头&#xff0c;借助快速软件开发框架&#xff0c;可以有效打破信息孤岛&#xff0c;让各…

Jetpack Compose 中适配不同的屏幕尺寸

窗口大小分类 Compose 将 Android 设备的屏幕尺寸分为三类&#xff1a; Compact: 小屏幕&#xff0c;一般就是手机设备&#xff0c;屏幕宽度 < 600dpMedium&#xff1a;中等屏幕&#xff0c;大号的板砖手机如折叠屏或平板的竖屏&#xff0c;600dp < 屏幕宽度 < 840d…

swagger关闭/v2/api-docs仍然可以访问漏洞

今天接到安全团队的说swagger有未授权访问漏洞&#xff0c;即使在swagger关闭的情况下http://127.0.0.1:8086/agcloud/v2/api-docs?group%E7%94%A8%E6%88%B7%E5%85%B3%E8%81%94%E4%BF%A1%E6%81%AF%E6%A8%A1%E5%9D%97仍然还能访问。 看了下原来是有写一个拦截器 registry.addI…

图表控件TeeChart for .NET系列教程六:将数据添加到系列中(使用系列)

TeeChart for .NET是优秀的工业4.0 WinForm图表控件&#xff0c;官方独家授权汉化&#xff0c;集功能全面、性能稳定、价格实惠等优势于一体。TeeChart for .NET 中文版还可让您在使用和学习上没有任何语言障碍&#xff0c;至少可以节省30%的开发时间。 TeeChart for .NET最新…

BFD协议原理

BFD协议原理引入背景不使用BFD带来的问题OSPF感知慢VRRP产生次优路径BFD技术简介BFD会话建立方式和检测机制BFD会话建立过程BFD工作流程BFD的单臂回声BFD默认参数以及调整方法总结引入背景 随着网络应用的广泛部署&#xff0c;网络发生中断可能影响业务正常运行并造成重大损失…

Git开发常用指令及其使用场景

目录前言一、Git安装1.1 官网安装二、配置Git工具2.1 用户信息配置2.2 查看配置三、初始化仓库3.1 创建仓库四、常用命令4.1 git clone拉取远端仓库4.2 git分支操作4.2.1 查看分支4.2.2 分支操作4.3 撤回操作4.3.1 撤回删错的分支4.3.2 撤回提交4.3 拉取代码操作4.4 贮存操作五…