Spark 应用调优

news2025/7/31 12:53:47

Spark 应用调优

  • 人数统计
    • 优化
  • 摇号次数分布
    • 优化
      • Shuffle 常规优化
      • 数据分区合并
      • 加 Cache
    • 优化
  • 中签率的变化趋势
  • 中签率局部洞察
    • 优化
  • 倍率分析
    • 优化

表信息 :

  • apply : 申请者 : 事实表
  • lucky : 中签者表 : 维度表
  • 两张表的 Schema ( batchNum,carNum ) : ( 摇号批次,申请编号 )
  • 分区键都是 batchNum

运行环境 :

在这里插入图片描述

配置项设置 :

在这里插入图片描述

优化点 :

在这里插入图片描述

人数统计

统计至今,参与摇号的总人次和幸运的中签者人数

val rootPath: String = _

// 申请者数据(因为倍率的原因,每一期,同一个人,可能有多个号码)
val hdfs_path_apply = s"${rootPath}/apply"
val applyNumbersDF = spark.read.parquet(hdfs_path_apply)
applyNumbersDF.count

// 中签者数据
val hdfs_path_lucky = s"${rootPath}/lucky"
val luckyDogsDF = spark.read.parquet(hdfs_path_lucky)
luckyDogsDF.count

SQL 实现 :

select
	count(*)
from applyNumbersDF

select
	count(*)
from luckyDogsDF

去重计数,得到实际摇号数 :

val applyDistinctDF = applyNumbersDF.select("batchNum", "carNum").distinct

applyDistinctDF.count

SQL 实现 :

select
	count(distinct batchNum ,carNum)
from applyDistinctDF

优化

分析 : 共有 3 个 Actions,会触发 3 个 Spark Jobs
用 Cache 原则:

  • RDD/DataFrame/Dataset 引用次数为 1,坚决不用 Cache
  • 当引用次数大于 1,且运行成本占比超过 30%,考虑用 Cache

优化 :

  • 利用 Cache 机制来提升执行性能
val rootPath: String = _

// 申请者数据(因为倍率的原因,每一期,同一个人,可能有多个号码)
val hdfs_path_apply = s"${rootPath}/apply"
val applyNumbersDF = spark.read.parquet(hdfs_path_apply)
// 缓存
applyNumbersDF.cache

applyNumbersDF.count

val applyDistinctDF = applyNumbersDF.select("batchNum", "carNum").distinct
applyDistinctDF.count

在这里插入图片描述

摇号次数分布

不同人群摇号次数的分布 :

  • 统计所有申请者累计参与了多少次摇号
  • 所有中签者摇了多少次号才能幸运地摇中签

统计所有申请者的分布情况

val result02_01 = applyDistinctDF
  .groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis"))
  .groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis"))
  .orderBy("x_axis")

result02_01.write.format("csv").save("_")

SQL 实现 :

with t1 as (
  select
  	carNum,
  	count(1) as x_axis
  from applyDistinctDF
  group by carNum
)
select
	x_axis,
	count(1) as y_axis
from t1
group by x_axis
order by x_axis

在这里插入图片描述

优化

分析 : 共两次 Shuffle。以 carNum 做分组计数, 以 x_axis 列再次做分组计数

Shuffle 的本质 : 数据的重新分发,凡是有 Shuffle 地方就要关注数据分布

  • 对过小的数据分片,要对进行合并

Shuffle 常规优化

优化点 : 减少 Shuffle 过程中磁盘与网络的请求次数

Shuffle 的常规优化:

  • By Pass 排序操作 : 条件:计算逻辑不涉及聚合或排序;Reduce 的并行度 < spark.shuffle.sort.bypassMergeThreshold
  • 调整读写缓冲区 : 条件 : Execution Memory 大

对读写缓冲区做调优 :

  • spark.shuffle.file.buffer : Map 写入缓冲区大小
  • spark.reducer.maxSizeInFlight : Reduce 读缓冲区大小

读写缓冲区是以 Task 为粒度进行设置,所以调整这些参数时, 扩大 50%

默认调优
spark.shuffle.file.buffer = 32KBspark.shuffle.file.buffer = 48 KB (32KB * 1.5)
spark.reducer.maxSizeInFlight = 48 MBspark.reducer.maxSizeInFlight = 72MB ( 48MB * 1.5)

性能对比 :

在这里插入图片描述

数据分区合并

优化点 : 提升 Reduce 阶段的 CPU 利用率

该数据集在内存的精确大小 :

def sizeNew(func: => DataFrame, spark: => SparkSession): String = {
  val result = func
  val lp = result.queryExecution.logical
  val size = spark.sessionState.executePlan(lp).optimizedPlan.stats.sizeInByte
  
  "Estimated size: " + size/1024 + "KB"
}

把 applyDistinctDF 作实参,调用 sizeNew 函数,返回大小 = 2.6 GB

  • 将数据集尺寸/并行度(spark.sql.shuffle.partitions = 200) = Reduce 每个数据分片的存储大小 ( 2.6 GB / 200 = 13 MB)
  • 数据分片大小在 200 MB 左右为宜,13 MB 太小

优化设置 :

  • 计算集群配置 Executors core = 3 * 2 = 6,其 minPartitionNum 为 6
# 开启 AQE
spark.sql.adaptive.enabled = true

# 自动分区合并
spark.sql.adaptive.coalescePartitions.enabled = true
# 合并后的大小
spark.sql.adaptive.advisoryPartitionSizeInBytes = 160MB/200MB/210MB/400MB
# 分区合并后的最小分区数
spark.sqladaptive.coalescePartitions.minPartitionNum = 6

总结 :

  • 并行度过高、数据分片过小,CPU 调度开销会变大,执行性能也变差
  • 检验值 : 分片粒度为 200 MB 左右时,执行性能是最优的
  • 并行度过低、数据分片过大,CPU 数据处理开销也会过大,执行性能会锐减

性能对比 :

在这里插入图片描述

加 Cache

Cache : 避免数据集在磁盘中的重复扫描与重复计算

applyDistinctDF.cache
applyDistinctDF.count

val result02_01 = applyDistinctDF
  .groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis"))
  .groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis"))
  .orderBy("x_axis")

result02_01.write.format("csv").save("_")

性能对比 :

在这里插入图片描述


得到中签者的摇号次数

val result02_02 = applyDistinctDF
  .join(luckyDogsDF.select("carNum"), Seq("carNum"), "inner")
  .groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis"))
  .groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis"))
  .orderBy("x_axis")

result02_02.write.format("csv").save("_")

SQL 实现 :


with t3 as (
  select
    carNum,
  	count(1) as x_axis
  from applyDistinctDF t1 join luckyDogsDF t2
  	on t1.carNum = t2.carNum
  group by carNum
)
select
	x_axis,
	count(1) as y_axis
from t3
group by x_axis
order by x_axis

在这里插入图片描述

优化

分析 : 计算中有一次数据关联,两次分组、聚合,排序

  • applyDistinctDF 有 1.35 亿条记录
  • luckyDogsDF 有 115 w条记录
  • 大表 Join 小表,最先想用广播变量

用广播变量来优化大小表关联计算 :

  • 估算小表在内存中的存储大小
  • 设置广播阈值 spark.sql.autoBroadcastJoinThreshold

sizeNew 计算 luckyDogsDF ,得到大小 = 18.5MB

设置广播阈值要大于 18.5MB ,即 : 设置为 20MB :

spark.sql.autoBroadcastJoinThreshold = 20MB

性能对比 :

在这里插入图片描述

中签率的变化趋势

计算中签率,分别统计每个摇号批次中的申请者和中签者人数

// 统计每批次申请者的人数
val apply_denominator = applyDistinctDF
  .groupBy(col("batchNum"))
  .agg(count(lit(1)).alias("denominator"))

// 统计每批次中签者的人数
val lucky_molecule = luckyDogsDF
  .groupBy(col("batchNum"))
  .agg(count(lit(1)).alias("molecule"))

val result03 = apply_denominator
  .join(lucky_molecule.select, Seq("batchNum"), "inner")
  .withColumn("ratio", round(col("molecule")/ col("denominator"), 5))
  .orderBy("batchNum")

result03.write.format("csv").save("_")

SQL 实现 :

with t1 as (
  select
  	batchNum,
  	count(1) as denominator
  from applyDistinctDF
  group by batchNum
),
t2 as (
  select
  	batchNum,
  	count(1) as molecule
  from luckyDogsDF
  group by batchNum
)
select
  batchNum,
	round(molecule/denominator, 5) as ratio
from t1 join t2 on t1.batchNum = t2.batchNum
order by batchNum

在这里插入图片描述

中签率局部洞察

统计 2018 年的中签率

// 筛选出2018年的中签数据,并按照批次统计中签人数
val lucky_molecule_2018 = luckyDogsDF
  .filter(col("batchNum").like("2018%"))
  .groupBy(col("batchNum"))
  .agg(count(lit(1)).alias("molecule"))

// 通过与筛选出的中签数据按照批次做关联,计算每期的中签率
val result04 = apply_denominator
  .join(lucky_molecule_2018, Seq("batchNum"), "inner")
  .withColumn("ratio", round(col("molecule")/ col("denominator"), 5))
  .orderBy("batchNum")

result04.write.format("csv").save("_")

SQL 实现 :

with t1 as (
  select
  	batchNum,
  	count(1) as molecule
  from luckyDogsDF
  where batchNum like '2018%'
  group by batchNum
)
select
	batchNum,
	round(molecule/denominator, 5)
from apply_denominator t2 on t1.batchNum = t2.batchNum
order by batchNum

在这里插入图片描述

优化

DPP 的条件 :

  • 事实表必须是分区表,且分区字段(可以是多个)必须包含 Join Key
  • DPP 仅支持等值 Joins,不支持大于、小于这种不等值关联关系
  • 维表过滤后的数据集,要小于广播阈值,调整 spark.sql.autoBroadcastJoinThreshold

DPP 优化 :

  • 降低事实表 applyDistinctDF 的磁盘扫描量
applyDistinctDF.select("batchNum", "carNum").distinct

applyDistinct.count

性能对比 :

在这里插入图片描述

倍率分析

倍率的分布情况 :

  • 不同倍率下的中签人数
  • 不同倍率下的中签比例

2016 年后的不同倍率下的中签人数 :

val result05_01 = applyNumbersDF
  .join(luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum"), Seq("carNum"), "inner")
  .groupBy(col("batchNum"), col("carNum")).agg(count(lit(1)).alias("multiplier"))
  .groupBy("carNum").agg(max("multiplier").alias("multiplier"))
  .groupBy("multiplier").agg(count(lit(1)).alias("cnt"))
  .orderBy("multiplier")

result05_01.write.format("csv").save("_")
with t3 as (
  select
  	batchNum,
  	carNum,
    count(1) as multiplier
  from applyNumbersDF t1 
  	join luckyDogsDF t2 on t1.carNum = t2.carNum
  where t2.batchNum >= '201601'
  group by batchNum, carNum
),
t4 as (
  select
    carNum,
  	max(multiplier) as multiplier
  from t3
  group by carNum
)
select
	multiplier,
	count(1) as cnt
from t4
group by multiplier
order by multiplier;

在这里插入图片描述

优化

关联中的 Join Key 是 carNum (非分区键),所以无法用 DPP 机制优化

将大表 Join 小表 , SMJ 转 BHJ :

  • 计算 luckyDogsDF 的内存大小,确保 < 广播阈值,利用 Spark SQL 的静态优化机制将 SMJ 转为 BHJ
  • 确保过滤后 luckyDogsDF < 广播阈值,利用 Spark SQL 的 AQE 机制动态将 SMJ 转为 BHJ
# 静态BHJ
spark.sql.autoBroadcastJoinThreshold = 20MB

# AQE 动态BHJ
spark.sql.autoBroadcastJoinThreshold = 10MB

性能对比 :

在这里插入图片描述


计算不同倍率人群的中签比例

// Step01: 过滤出2016-2019申请者数据,统计出每个申请者在每期内的倍率,并在所有批次中选取
val apply_multiplier_2016_2019 = applyNumbersDF
  .filter(col("batchNum") >= "201601")
  .groupBy(col("batchNum"), col("carNum")).agg(count(lit(1)).alias("multiplier"))
  .groupBy("carNum").agg(max("multiplier").alias("multiplier"))
  .groupBy("multiplier").agg(count(lit(1)).alias("apply_cnt"))

// Step02: 将各个倍率下的申请人数与各个倍率下的中签人数左关联,并求出各个倍率下的中签率
val result05_02 = apply_multiplier_2016_2019
  .join(result05_01.withColumnRenamed("cnt", "lucy_cnt"), Seq("multiplier"), "left")
  .na.fill(0)
  .withColumn("ratio", round(col("lucy_cnt")/ col("apply_cnt"), 5))
  .orderBy("multiplier")

result05_02.write.format("csv").save("_")

SQL 实现 :

with t5 as (
  select
  	batchNum,
  	carNum
  	count(1) as multiplier
  from applyNumbersDF 
  where batchNum >= '201601'
  group by batchNum, carNum
),
t6 as (
  select
  	carNum,
  	max(multiplier) as multiplier
  from t1
  group by carNum
),
t7 as (
  select
  	multiplier,
  	count(1) as apply_cnt
  from t2 
  group by multiplier
)
select 
	multiplier,
	round(coalesce(lucy_cnt, 0)/ apply_cnt, 5) as ratio
from t7 left 
	left join t5 on t5.multiplier = t7.multiplier
order by multiplier;

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/394493.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java基础——Lambda表达式

一、函数式编程思想概述https://www.runoob.com/java/java8-lambda-expressions.html在数学中&#xff0c;函数就是有输入量、输出量的一套计算方案&#xff0c;也就是“拿数据做操作”面向对象强调“必须通过对象的形式来做事情”&#xff0c;做事情之前首先要创建一个对象函数…

B站游戏推广,怎样查看B站游戏推广榜数据?

近年来&#xff0c;B站发展得越来越大&#xff0c;越来越多的游戏商也加入B站进行推广&#xff0c;那么作为游戏运营商&#xff0c;怎样查看游戏在B站上的推广数据呢&#xff1f;游戏排行榜游戏商业推广榜包含视频推广榜、直播推广榜&#xff0c;按照日榜、周榜、月榜&#xff…

【Linux 网络编程1】使用UDP/TCP编写套接字,多进程/多线程版本的TCP编写的套接字,将套接字封装

目录 1.学习网络编程前的一些基础知识 2.UDP(user datagram protocol)协议的特点 3.使用有UPD编写套接字 4.使用TCP编写套接字 4.2.TCP客服端 4.3.TCP服务器端 4.4.单进程版本&#xff08;没有人会使用&#xff09; 4.5.多进程版本 4.6.多线程版本 5.把套接字封装 1…

扬帆优配|2600亿新能源巨头狂飙!外资唱多中国:再涨15%

全国停摆的危机&#xff0c;正在迫临法国。 大停工正在将法国推向风险境地。法国政府估计&#xff0c;当地时间3月7日&#xff0c;将迸发全国大型停工游行。法国总工会宣告&#xff0c;到时将让全法国停摆。法国担任交通业务的部长级代表克莱蒙博讷正告称&#xff0c;7日将成为…

JavaScript 混淆技术

根据JShaman&#xff08;JShaman是专业的JavaScript代码混淆加密网站&#xff09;提供的消息&#xff0c;JavaScript混淆技术大体有以下几种&#xff1a; 变量混淆 将带有JS代码的变量名、方法名、常量名随机变为无意义的类乱码字符串&#xff0c;降低代码可读性&#xff0c;如…

原神 Android 教程 —安卓版

准备材料 一台能读写 /system 分区的 Android 手机(或:一台安装了 Magisk 的 Android 手机) 有人搞出来免root端了,此条件不再必须私服客户端

数据同步工具Sqoop

大数据Hadoop之——数据同步工具SqoopSqoop基本原理及常用方法 1 概述 Apache Sqoop&#xff08;SQL-to-Hadoop&#xff09;项目旨在协助RDBMS&#xff08;Relational Database Management System&#xff1a;关系型数据库管理系统&#xff09;与Hadoop之间进行高效的大数据交…

HStream Console、HStreamDB 0.14 发布

近两个月&#xff0c;HStreamDB 相继发布了 0.13 和 0.14 版本&#xff0c;包含多项已知问题修复。同时&#xff0c;我们也发布了全新的 HStream Console 组件&#xff0c;为 HStreamDB 带来了简洁友好的图形化管理界面&#xff0c;将帮助用户更轻松地使用和管理 HStreamDB. H…

LinqConnect兼容性并支持Visual Studio 2022版本

LinqConnect兼容性并支持Visual Studio 2022版本 现在支持Microsoft Visual Studio 2022版本17.5预览版。 添加了Microsoft.NET 7兼容性。 共享代码-共享相同的代码&#xff0c;以便在不同的平台上处理数据。LinqConnect是一种数据库连接解决方案&#xff0c;适用于不同的基于.…

Ubuntu下安装Docker

大家好&#xff0c;我是中国码农摘星人。 欢迎分享/收藏/赞/在看&#xff01; 文章目录1 安装Docker1.1 使用官方安装脚本自动安装 &#xff08;仅适用于公网环境&#xff09;1.2 手动安装帮助1.2.1 Ubuntu 14.04 16.04 (使用apt-get进行安装)2 安装校验3 镜像加速器3.1 安装&a…

在 k8S 中搭建 SonarQube 7.4.9 版本(使用 PostgreSQL 数据库)

本文搭建的 SonarQube 版本是 7.4.9-community&#xff0c;由于在官方文档中声明 7.9 版本之后就不再支持使用 MySQL 数据库。所以此次搭建使用的数据库是 PostgreSQL 11.4 版本。 一、部署 PostgreSQL 服务 1. 创建命名空间 将 PostgreSQL 和 SonarQube 放在同一个命名空间…

Docker(二)

5.容器数据卷 1.什么是容器数据卷 docker理念回顾 将应用和环境打包成一个镜像&#xff01; 数据&#xff1f;如果数据都在容器中&#xff0c;那么我们容器删除&#xff0c;数据就会丢失&#xff01;需求&#xff1a;数据可以持久化 MySQL&#xff0c;容器删了&#xff0c…

都工作3年了,怎么能不懂双亲委派呢?(带你手把手断点源码)

&#x1f497;推荐阅读文章&#x1f497; &#x1f338;JavaSE系列&#x1f338;&#x1f449;1️⃣《JavaSE系列教程》&#x1f33a;MySQL系列&#x1f33a;&#x1f449;2️⃣《MySQL系列教程》&#x1f340;JavaWeb系列&#x1f340;&#x1f449;3️⃣《JavaWeb系列教程》…

c盘怎么清理到最干净?有什么好的清理方法

c盘怎么清理到最干净?有什么好的清理方法&#xff1f;清理C盘空间是电脑维护的重要步骤之一。C盘是Windows操作系统的核心部分&#xff0c;保存了许多重要的系统文件&#xff0c;因此空间不足会影响计算机的性能和稳定性。下面是一些清理C盘空间的方法 一.清理临时文件 在使用…

【Java学习笔记】10.条件语句 - if...else及switch case 语句

前言 本章介绍Java的条件语句 - if…else和switch case 语句。 Java 条件语句 - if…else Java 中的条件语句允许程序根据条件的不同执行不同的代码块。 一个 if 语句包含一个布尔表达式和一条或多条语句。 语法 if 语句的语法如下&#xff1a; if(布尔表达式) {//如果布…

实验二:动态规划

1.双11的红包雨 问题描述 双11到了&#xff0c;据说这2天会下红包雨&#xff0c;每个红包有不同的价值&#xff0c;小k好开心&#xff0c;但有个规则&#xff0c;就只能接掉落在他身旁的10米范围内的红包&#xff08;0-10这11个位置&#xff09;。小k想尽可能的多抢红包&…

评价提高分子对接性能的组合策略

评价提高分子对接性能的组合策略 相关背景&#xff1a; 分子对接可能是应用于基于结构的药物设计和虚拟筛选活动中最快速、成本最低的计算技术&#xff0c;它可以从巨大的化合物文库中找到潜在的活性分子&#xff0c;用于先导发现。 在分子对接中&#xff0c;配体(通常是小分…

LIME: Low-light Image Enhancement viaIllumination Map Estimation

Abstract当人们在低光条件下拍摄图像时&#xff0c;图像通常会受到低能见度的影响。除了降低图像的视觉美感外&#xff0c;这种不良的质量还可能显著降低许多主要为高质量输入而设计的计算机视觉和多媒体算法的性能。在本文中&#xff0c;我们提出了一种简单而有效的微光图像增…

2023年最新的站内SEO指南:如何通过关键词优化提高网站排名

SEO或搜索引擎优化是指通过改善网站的内部和外部元素&#xff0c;以获得更好的自然搜索引擎排名和更多的网站流量。 链接建设和外链是SEO的重要组成部分&#xff0c;因为它们可以提高网站的权威性和可信度&#xff0c;从而使其在搜索引擎中排名更高。 在此指南中&#xff0c;…

MySQL三范式

1、查询语句写的烂2、索引失效&#xff08;数据变更&#xff09;3、关联查询太多join&#xff08;设计缺陷或不得已的需求&#xff09;4、服务器调优及各个参数设置&#xff08;缓冲、线程数等&#xff09; 通常SQL调优过程&#xff1a; 观察&#xff0c;至少跑1天&#xff0…