【Spark分布式内存计算框架——Spark Core】4. RDD函数(下) 重分区函数、聚合函数

news2025/6/7 11:17:47

重分区函数

如何对RDD中分区数目进行调整(增加分区或减少分区),在RDD函数中主要有如下三个函数。
1)、增加分区函数
函数名称:repartition,此函数使用的谨慎,会产生Shuffle。
在这里插入图片描述
2)、减少分区函数
函数名称:coalesce,此函数不会产生Shuffle,当且仅当降低RDD分区数目。
比如RDD的分区数目为10个分区,此时调用rdd.coalesce(12),不会对RDD进行任何操作。
在这里插入图片描述
3)、调整分区函数
在PairRDDFunctions(此类专门针对RDD中数据类型为KeyValue对提供函数)工具类中
partitionBy函数:
在这里插入图片描述
范例演示代码,适当使用函数调整RDD分区数目:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中分区函数,调整RDD分区数目,可以增加分区和减少分区
*/
object SparkPartitionTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 读取本地文件系统文本文件数据
val datasRDD: RDD[String] = sc.textFile("datas/wordcount/wordcount.data", minPartitions = 2)
// TODO: 增加RDD分区数
val etlRDD: RDD[String] = datasRDD.repartition(3)
println(s"EtlRDD 分区数目 = ${etlRDD.getNumPartitions}")
// 词频统计
val resultRDD: RDD[(String, Int)] = etlRDD
// 数据分析,考虑过滤脏数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词,注意去除左右空格
.flatMap(line => line.trim.split("\\s+"))
// 转换为二元组,表示单词出现一次
.mapPartitions{iter =>
iter.map(word => (word, 1))
}
// 分组聚合,按照Key单词
.reduceByKey((tmp, item) => tmp + item)
// 输出结果RDD
resultRDD
// TODO: 对结果RDD降低分区数目
.coalesce(1)
.foreachPartition(iter => iter.foreach(println))
// 应用程序运行结束,关闭资源
sc.stop()
}
}

在实际开发中,什么时候适当调整RDD的分区数目呢?让程序性能更好好呢????

第一点:增加分区数目

  • 当处理的数据很多的时候,可以考虑增加RDD的分区数目
    在这里插入图片描述

第二点:减少分区数目

  • 其一:当对RDD数据进行过滤操作(filter函数)后,考虑是否降低RDD分区数目
    在这里插入图片描述
  • 其二:当对结果RDD存储到外部系统
    在这里插入图片描述

聚合函数

在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就是其中聚合函数的使用。

集合中聚合函数
回顾列表List中reduce聚合函数核心概念:聚合的时候,往往需要聚合中间临时变量。查看列表List中聚合函数reduce和fold源码如下:
在这里插入图片描述
通过代码,看看列表List中聚合函数使用:
在这里插入图片描述
运行截图如下所示:
在这里插入图片描述
fold聚合函数,比reduce聚合函数,多提供一个可以初始化聚合中间临时变量的值参数:
在这里插入图片描述
聚合操作时,往往聚合过程中需要中间临时变量(到底时几个变量,具体业务而定),如下案例:
在这里插入图片描述

RDD 中聚合函数
在RDD中提供类似列表List中聚合函数reduce和fold,查看如下:
在这里插入图片描述
案例演示:求列表List中元素之和,RDD中分区数目为2,核心业务代码如下:
在这里插入图片描述
运行原理分析:
在这里插入图片描述
使用RDD中fold聚合函数:
在这里插入图片描述
查看RDD中高级聚合函数aggregate,函数声明如下:
在这里插入图片描述
业务需求:使用aggregate函数实现RDD中最大的两个数据,分析如下:
在这里插入图片描述
核心业务代码如下:
在这里插入图片描述
运行结果原理剖析示意图:
在这里插入图片描述

上述完整范例演示代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* RDD中聚合函数:reduce、aggregate函数
*/
object SparkAggTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 模拟数据,1 到 10 的列表,通过并行方式创建RDD
val datas = 1 to 10
val datasRDD: RDD[Int] = sc.parallelize(datas, numSlices = 2)
// 查看每个分区中的数据
datasRDD.foreachPartition{iter =>
println(s"p-${TaskContext.getPartitionId()}: ${iter.mkString(", ")}")
}
println("=========================================")
// 使用reduce函数聚合
val result: Int = datasRDD.reduce((tmp, item) => {
println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
tmp + item
})
println(result)
println("=========================================")
// 使用fold函数聚合
val result2: Int = datasRDD.fold(0)((tmp, item) => {
println(s"p-${TaskContext.getPartitionId()}: tmp = $tmp, item = $item")
tmp + item
})
println(result2)
println("=========================================")
// 使用aggregate函数获取最大的两个值
val top2: mutable.Seq[Int] = datasRDD.aggregate(new ListBuffer[Int]())(
// 分区内聚合函数,每个分区内数据如何聚合 seqOp: (U, T) => U,
(u, t) => {
println(s"p-${TaskContext.getPartitionId()}: u = $u, t = $t")
// 将元素加入到列表中
u += t //
// 降序排序
val top = u.sorted.takeRight(2)
// 返回
top
},
// 分区间聚合函数,每个分区聚合的结果如何聚合 combOp: (U, U) => U
(u1, u2) => {
println(s"p-${TaskContext.getPartitionId()}: u1 = $u1, u2 = $u2")
u1 ++= u2 // 将列表的数据合并,到u1中
//
u1.sorted.takeRight(2)
}
)
println(top2)
// 应用程序运行结束,关闭资源
sc.stop()
}
}

PairRDDFunctions 聚合函数
在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数:reduceByKey、groupByKey等。*ByKey函数:将相同Key的Value进行聚合操作的,省去先分组再聚合。

第一类:分组函数groupByKey
在这里插入图片描述
第二类:分组聚合函数reduceByKey和foldByKey
在这里插入图片描述
但是reduceByKey和foldByKey聚合以后的结果数据类型与RDD中Value的数据类型是一样的。

第三类:分组聚合函数aggregateByKey
在这里插入图片描述
在企业中如果对数据聚合使用,不能使用reduceByKey完成时,考虑使用aggregateByKey函数,基本上都能完成任意聚合功能。

演示范例代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* RDD中聚合函数,针对RDD中数据类型Key/Value对:
* groupByKey
* reduceByKey/foldByKey
* aggregateByKey
* combineByKey
*/
object SparkAggByKeyTest {
def main(args: Array[String]): Unit = {
// 创建应用程序入口SparkContext实例对象
val sc: SparkContext = {
// 1.a 创建SparkConf对象,设置应用的配置信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// 1.b 传递SparkConf对象,构建Context实例
new SparkContext(sparkConf)
}
sc.setLogLevel("WARN")
// 1、并行化集合创建RDD数据集
val linesSeq: Seq[String] = Seq(
"hadoop scala hive spark scala sql sql", //
"hadoop scala spark hdfs hive spark", //
"spark hdfs spark hdfs scala hive spark" //
)
val inputRDD: RDD[String] = sc.parallelize(linesSeq, numSlices = 2)
// 2、分割单词,转换为二元组
val wordsRDD: RDD[(String, Int)] = inputRDD
.flatMap(line => line.split("\\s+"))
.map(word => word -> 1)
// TODO: 先使用groupByKey函数分组,再使用map函数聚合
val wordsGroupRDD: RDD[(String, Iterable[Int])] = wordsRDD.groupByKey()
val resultRDD: RDD[(String, Int)] = wordsGroupRDD.map{ case (word, values) =>
val count: Int = values.sum
word -> count
}
println(resultRDD.collectAsMap())
// TODO: 直接使用reduceByKey或foldByKey分组聚合
val resultRDD2: RDD[(String, Int)] = wordsRDD.reduceByKey((tmp, item) => tmp + item)
println(resultRDD2.collectAsMap())
val resultRDD3 = wordsRDD.foldByKey(0)((tmp, item) => tmp + item)
println(resultRDD3.collectAsMap())
// TODO: 使用aggregateByKey聚合
/*
def aggregateByKey[U: ClassTag]
(zeroValue: U) // 聚合中间临时变量初始值,类似fold函数zeroValue
(
seqOp: (U, V) => U, // 各个分区内数据聚合操作函数
combOp: (U, U) => U // 分区间聚合结果的聚合操作函数
): RDD[(K, U)]
*/
val resultRDD4 = wordsRDD.aggregateByKey(0)(
(tmp: Int, item: Int) => {
tmp + item
},
(tmp: Int, result: Int) => {
tmp + result
}
)
println(resultRDD4.collectAsMap())
// 应用程序运行结束,关闭资源
Thread.sleep(1000000)
sc.stop()
}
}

面试题
RDD中groupByKey和reduceByKey区别???

  • reduceByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
    在这里插入图片描述
  • groupByKey函数:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的函数,将相同key的值聚合到一起,与reduceByKey的区别是只生成一个sequence。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/334623.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

回归预测 | MATLAB实现PSO-LSSVM粒子群算法优化最小二乘支持向量机多输入单输出

回归预测 | MATLAB实现PSO-LSSVM粒子群算法优化最小二乘支持向量机多输入单输出 目录回归预测 | MATLAB实现PSO-LSSVM粒子群算法优化最小二乘支持向量机多输入单输出预测效果基本介绍模型描述程序设计参考资料预测效果 基本介绍 MATLAB实现PSO_LSSVM粒子群算法优化最小二乘支持…

车道线检测-LaneATT 论文学习笔记

论文:《Keep your Eyes on the Lane: Real-time Attention-guided Lane Detection》 地址:https://arxiv.org/abs/2010.12035v2 代码:https://github.com/lucastabelini/LaneATT 整体结构 车道线的表示方式 Lane{(xi,yi)}i0Npts−1,yii⋅Hima…

【23种设计模式】结构型模式详细介绍

前言 本文为 【23种设计模式】结构型模式 相关内容介绍,下边将对适配器模式,桥接模式,组合模式,装饰模式,外观模式,亨元模式,代理模式,具体包括它们的特点与实现等进行详尽介绍~ &a…

mysql物理innobackupex备份脚本和自动备份脚本

目录 备份命令 恢复命令 自动备份脚本 innobackupex是一款MySQL备份工具,备份速度快(通过直接copy物理文件),而且支持压缩、流式传输、加密等功能 新安装的数据库自带innobackupex,如果找不到命令,需安装percona-xtrabackup-2…

python数据结构:数组、链表、栈、队列、树

目录1.数组1.1 数组的数据结构1.1.1 数组的定义1.1.2 随机访问和连续内存1.1.3 静态内存和动态内存1.1.4 物理大小和逻辑大小1.2 数组的操作1.2.1 增加数组大小1.2.2 减小数组大小1.2.3 插入一项1.2.4 删除一项1.2.5 复杂度权衡1.3 二维数组2.链表2.1 链表分类2.2 链表特点2.3 …

OKCC呼叫中心使用中常见问题及处理方法

经常有客户咨询在使用OKCC呼叫中心系统时遇到的一些常见但不复杂的问题,下面整理了一些问题和处理方法给伙伴们参考:一、外呼任务为何启动后会自动暂停?1.检查该账户余额是否充足;2.外呼任务班组中是否有空闲坐席;3.分…

终于找到blender渲染总是崩溃的原因了

如果您开始渲染,Blender 会崩溃,并在渲染过程中自动关闭,可能是由于这两个主要原因之一。Blender 用完了可用内存显卡有问题在本文中,我们将了解如何处理 Blender 在渲染时崩溃的情况。Blender内存不足如果我们从 RAM 问题开始。要…

卸载Node.js

0 写在前面 无论您是因为什么原因要卸载Node.js都必须要卸载干净。 请阅读: 1 卸载步骤 1.1通过控制面板卸载node.js winR—>control.exe—>卸载程序—>卸载Node.js 等待—>卸载成功 1.2 删除安装时的nodejs文件夹 通过记忆或者Everthing搜索找…

《MySQL学习》 事务隔离 与 MVCC

《MySQL学习》 事务隔离 一.事务的概念 事务保证一组数据要么全部成功要么全部失败,MySQL的事务基于引擎(如InnoDB)实现。 二.事务的隔离性与隔离级别 MySQL的标准隔离级别: 读未提交 : 一个事务还没提交时&#…

网络变压器与不同芯片之间的匹配原则及POE通讯产品需要注意哪些方面

Hqst盈盛电子导读:网络变压器与不同芯片之间的匹配原则及POE通讯产品需要注意哪些方面网络变压器与不同芯片之间的匹配原则:一,电流型PHY芯片一般要配的网络变压器:1、变压器PHY侧3线共模电感 (更适合POE产品&#xff…

Nginx_3

Rewrite功能配置 Rewrite是Nginx服务器提供的一个重要基本功能,是Web服务器产品中几乎必备的功能。主要的作用是用来实现URL的重写。www.jd.com 注意:Nginx服务器的Rewrite功能的实现依赖于PCRE的支持,因此在编译安装Nginx服务器之前,需要安…

Thinkphp大型进销存ERP源码/ 进销存APP源码/小程序ERP系统/含VUE源码支持二次开发

框架:ThinkPHP5.0.24 uniapp 包含:服务端php全套开源源码,uniapp前端全套开源源码(可发布H5/android/iod/微信小程序/抖音小程序/支付宝/百度小程序) 注:这个是全开源,随便你怎么开,怎么来&a…

DHCP基础

DHCP基础产生背景传统手工配置的缺陷DHCP基本概念及优点DHCP工作原理DHCP租期更新DHCP相关配置命令解析产生背景 因为现在上网的人已经变得非常多了,上网又需要分配IP地址,那么如何进行IP地址的分配?如果是手动分配IP地址,在人数…

深眸科技以科技赋能智慧物流搭建,实现周转箱拆垛作业智能化

数字化时代下市场竞争的核心要素转化为科技的竞争,智能化技术的投入是企业占据市场竞争绝对优势的重要支撑。深眸科技凭借轻辙视觉引擎实现周转箱拆垛作业的智能化突破。人力成本增加,企业积极转变特别是在后疫情时代,人力成本迅猛增加&#…

Docker进阶 - 7. docker network 网络模式之 host

目录 1. host 模式概述 2. host模式代码语法 3. docker inspect 查看 bridge/host模式 容器元数据 4. ip addr 进入 tomcat83 (host模式) 容器内部查看容器ip 5. 如何访问启动 tomcat83 (host模式) 1. host 模式概述 直接使用宿主机的IP地址与外界进行通信,不…

警惕!爆火的ChatGPT 暗藏的安全隐患 数字信息的未来

近段时间以来,ChatGPT 在各大平台网站是刷屏一般的存在,随之而来的各式各样的赞美与吁叹,更是不断地勾起人们的好奇心理。但在几天铺天盖地式的营销之后,ChatGPT 的舆论在2月7日晚的舆论风口就发现了极大的转变,各平台…

2010-2019年290个城市经济发展与环境污染数据

2010-2019年290个城市经济发展与环境污染数据 1、时间:2010-2019年 2、统计口径:全市 3、来源:城市统计NJ,缺失情况与年鉴一致 4、指标包括: 综合经济:地区生产总值、人均地区生产总值、地区生产总值增…

云原生系列之使用 prometheus监控远程主机实战

文章目录前言一. 实验环境二. 安装node_exporter2.1 node_exporter的介绍2.2 node_exporter的安装三. 在prometheus服务端配置监控远程主机3.1 在server端配置拉取node的信息3.2 重启prometheus3.3 通过浏览器查看prometheus总结前言 大家好,又见面了,我…

Pyqt5小案例,界面与逻辑分离的小计算器程序

直接看下最终效果: 使用技术总结 使用Designer设计界面 使用pyuic5命令导出到python文件 新建逻辑处理文件,继承pyuic5导出的文件的类,在里面编写信号与槽的处理逻辑 使用Designer设计界面 要使用Designer,安装一个Python库即…

Qml学习——鼠标事件处理MouseArea

最近在学习Qml,但对Qml的各种用法都不太熟悉,总是会搞忘,所以写几篇文章对学习过程中的遇到的东西做一个记录。 学习参考视频:https://www.bilibili.com/video/BV1Ay4y1W7xd?p1&vd_source0b527ff208c63f0b1150450fd7023fd8 其…