Spark-序列化、依赖关系、持久化

news2025/7/19 14:26:27

序列化

闭包检查

序列化方法和属性

依赖关系 

RDD 血缘关系

RDD 窄依赖

RDD 宽依赖

RDD 任务划分

RDD 持久化

RDD Cache 缓存

RDD CheckPoint 检查点

缓存和检查点区别


序列化

闭包检查

        从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就 形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列 化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

序列化方法和属性

        从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。

object spark_02 {
  def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))


    //创建查询对象
    val search = new Search("h")
    //函数传递,打印:ERROR Task not serializable
    search.getMatch1(rdd).collect().foreach(println)

    println("========================================")
    
    //属性传递,打印:ERROR Task not serializable
    search.getMatch2(rdd).collect().foreach(println)
    
    //关闭环境
    sc.stop()
  }
}

//查询对象
//类的构造参数是类的属性,构造参数需要进行闭包检查(对类进行闭包检查)
class Search(query:String) extends Serializable {
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  // 函数序列化案例
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }
  // 属性序列化案例
  def getMatch2(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

 

依赖关系 

        相邻的两个RDD关系称之为依赖关系

RDD 血缘关系

        多个连续的RDD的依赖关系称之为血缘关系

        RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转 换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的 数据分区。

val fileRDD: RDD[String] = sc.textFile("input/1.txt")
println(fileRDD.toDebugString) //打印输出血缘关系
println("----------------------")
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println("----------------------")
val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println("----------------------")
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)
resultRDD.collect()

RDD 窄依赖

        窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用, 窄依赖我们形象的比喻为独生子女。

RDD 宽依赖

        宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会 引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

RDD 任务划分

        RDD 任务切分中间分为:Application、Job、Stage 和 Task:

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

RDD 持久化

RDD Cache 缓存

        RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存 在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

        缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机 制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数 据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可, 并不需要重算全部 Partition。 Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样 做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时 候,如果想重用数据,仍然建议调用 persist 或 cache。

RDD CheckPoint 检查点

        所谓的检查点其实就是通过将 RDD 中间结果写入磁盘 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点 之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")
// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
 word => {
 (word, System.currentTimeMillis())
 }
}
// 增加缓存,避免再重新跑一个 job 做 checkpoint
wordToOneRdd.cache()
// 数据检查点:针对 wordToOneRdd 做检查点计算
wordToOneRdd.checkpoint()
// 触发执行逻辑
wordToOneRdd.collect().foreach(println)

缓存和检查点区别

  1. Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
  2. Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存 储在 HDFS 等容错、高可用的文件系统,可靠性高。
  3. 建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存 中读取数据即可,否则需要再从头计算一次 RDD。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/404824.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Lesson 9.3 集成算法的参数空间与网格优化和使用网格搜索在随机森林上进行调参

文章目录一、集成算法的参数空间与网格优化1. 学习曲线2. 决策树对象 Tree二、使用网格搜索在随机森林上进行调参1. 建立 benchmark2. 创建参数空间3. 实例化用于搜索的评估器、交叉验证评估器与网格搜索评估器4. 训练网格搜索评估器5. 查看结果在开始学习之前,先导…

拆解DKD loss (建议读完论文哈)

论文链接:https://arxiv.org/abs/2203.08679def dkd_loss(logits_student, logits_teacher, target, alpha, beta, temperature):gt_mask _get_gt_mask(logits_student, target) # 获取掩码other_mask _get_other_mask(logits_student, target)pred_student …

自动化测试——数据驱动测试

数据驱动测试 在实际的测试过程中,我们会发现好几组用例都是相同的操作步骤,只是测试数据的不同,而我们往往需要编写多次用例来进行测试,此时我们可以利用数据驱动测试来简化该种操作。 参数化: 输入数据的不同从而产…

一篇文章教你彻底理解ThreadLocal

文章目录ThreadLocal是什么?ThreadLocal如何使用?特别注意ThreadLocal数据存储存取ThreadLocal原理解析Thread.threadLocals原理Thread.inheritableThreadLocals原理ThreadLocal内存泄漏内存泄漏原因对内存泄漏的补救用完就要删除(最终解决&a…

统一流程平台----执行流应用

在flowable平台中,执行流(Execution)完成了流程实例/执行分支/任务/子流程之间关系的建立。flowable整个体系以执行流为基础,完成上下游数据的关联,让bpmn图纸能按照约定进行流转,形成了第一层概念。1.执行…

vue3实战项目安装各种爆时候报错问题和解决

文章目录1.安装:npm install -g sass报错问题1.npm install失败,报错如下引入使用echarts 相关问题1. vue3中npm install echarts --save报错但是这个地方有报提示,问题待解决...........1.安装:npm install -g sass 注释: 多种安装方法 2.vue中局部引用,也可以设置全局css文件…

C++:vector和list的迭代器区别和常见迭代器失效问题

迭代器常见问题的汇总vector迭代器和list迭代器的使用vector迭代器list迭代器vector迭代器失效问题list迭代器失效问题vector和list的区别vector迭代器和list迭代器的使用 学习C,使用迭代器和了解迭代器失效的原因是每个初学者都需要掌握的,接下来我们就…

C++代码格式化-clang-format

文章目录前言c|vscode|clang-formatc|vs|clang-formatc|.clang-format其他附录Visual Studio格式在vs和vscode中不同无法从繁体切换到简体我的vs code配置前言 一个项目中的代码,可能来自不同的地方。不管是多人合作,还是ctrl-c/ctrl-v,都有…

剑指offer JZ6 从尾到头打印链表

Java 剑指offer JZ6 从尾到头打印链表 文章目录Java 剑指offer JZ6 从尾到头打印链表一、题目描述二、递归写法三、栈方法使用Java的递归和栈解决从尾到头打印链表的问题 一、题目描述 输入一个链表的头节点,按链表从尾到头的顺序返回每个节点的值(用数组…

spring cloud @RefreshScope 刷新机制

在学习 nacos 的配置修改发现用到了 RefreshScope 注解,将 spring boot 的日志调整如下logging:level:com:alibaba:cloud: debugorg:springframework:context: debugcloud: debug调用 nacos 的配置修改,看到如下信息2023-03-10 15:48:15.332 INFO [com.a…

三天吃透MySQL面试八股文

本文已经收录到Github仓库,该仓库包含计算机基础、Java基础、多线程、JVM、数据库、Redis、Spring、Mybatis、SpringMVC、SpringBoot、分布式、微服务、设计模式、架构、校招社招分享等核心知识点,欢迎star~ Github地址:https://github.com/…

MGRE综合实验

实验拓扑及相关要求: IP地址配置: ip规划如该拓扑上可视 缺省路由: [r1]ip route-static 0.0.0.0 0 15.0.0.2 [r2]ip route-static 0.0.0.0 0 25.0.0.2 [r3]ip route-static 0.0.0.0 0 35.0.0.2 [r4]ip route-static 0.0.0.0 0 45.0.0.2 公…

Java的二叉树、红黑树、B+树

数组和链表是常用的数据结构,数组虽然查找快(有序数组可以通过二分法查找),但是插入和删除是比较慢的;而链表,插入和删除很快(只需要改变一些引用值),但是查找就很慢&…

游戏引擎开发总结:序列化系统

序列化需要准备什么?首先,我们需要一个被序列化类实现序列化函数,以及序列化库。准备我的序列化库是Yaml-cpp,序列话函数就命名为Serialize,另外我们不需要关心组件类型是具体是什么,所以我这边使用多态&am…

Spring和MaBatis整合

Spring和MyBatis整合: 先瞅一眼各种文件路径: 将之前mybatis中的测试类中的SqlSessionFactory(通过其openSession()来获得对象SqlSession),和Mybatis配置文件中的数据源(url,username等&#…

【Java爬虫】HttpClient+Jsoup实现爬取校内新闻

警告网络爬虫作为一门技术,在使用过程中,应该遵守Robots协议。采集数据时应注意礼貌,不允许爬的网站尽量不要短时间大频率爬取,涉及hdd的网站更是不要去满足自己的好奇心,不然说不准哪天就和吴签一起吃大碗宽面了...介…

[洛谷-P2585][ZJOI2006]三色二叉树(树形DP+状态机DP)

[洛谷-P2585][ZJOI2006]三色二叉树(树形DP状态机DP)一、题目题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示数据规模与约定二、分析1、递归建树2、树形DP 状态机DP(1)状态表示(2)状态转移三、…

C++11异步编程

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录前言1、std::future和std::shared_future1.1 std:future1.2 std::shared_future2、std::async3、std::promise4、std::packaged_task前言 C11提供了异步操作相关的类…

Vue3电商项目实战-结算支付 2【03-结算-对话框组件封装、04-结算-收货地址-切换】

文章目录03-结算-对话框组件封装04-结算-收货地址-切换03-结算-对话框组件封装 目的:实现一个对话框组件可设置标题,动态插入内容,动态插入底部操作按钮,打开关闭功能。 大致步骤: 参照xtx-confirm定义一个基础布局实…

MFC常用控件使用(文本框、编辑框、下拉框、列表控件、树控件)

简介 本文章主要介绍下MFC常用控件的使用,包括静态文本框(Static Text)、编辑框(Edit Control)、下拉框(Combo Box)、列表控件(List Control)、树控件(Tree Control)的使用。 创建项目 我们选择 文件->新建->新建项目,选择MFC程序 选择基于对话…