Spark 任务调度机制

news2025/8/14 19:29:04

1.Spark任务提交流程

Spark YARN-Cluster模式下的任务提交流程,如下图所示:

 图YARN-Cluster任务提交流程

下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程:

图Spark任务提交时序图

提交一个Spark应用程序,首先通过Client向ResourceManager请求启动一个Application,同时检查是否有足够的资源满足Application的需求,如果资源条件满足,则准备ApplicationMaster的启动上下文,交给ResourceManager,并循环监控Application状态。

当提交的资源队列中有资源时,ResourceManager会在某个NodeManager上启动ApplicationMaster进程,ApplicationMaster会单独启动Driver后台线程,当Driver启动后,ApplicationMaster会通过本地的RPC连接Driver,并开始向ResourceManager申请Container资源运行Executor进程(一个Executor对应与一个Container),当ResourceManager返回Container资源,ApplicationMaster则在对应的Container上启动Executor。

Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。

当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。

从上述时序图可知,Client只负责提交Application并监控Application的状态。对于Spark的任务调度主要是集中在两个方面: 资源申请和任务分发,其主要是通过ApplicationMaster、Driver以及Executor之间来完成。

2.Spark任务调度概述

当Driver起来后,Driver则会根据用户程序逻辑准备任务,并根据Executor资源情况逐步分发任务。在详细阐述任务调度前,首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念:

  1. Job是以Action方法为界,遇到一个Action方法则触发一个Job;
  2. Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle做一次划分;
  3. Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度,总体调度流程如下图所示:

 Spark任务调度概览

Spark RDD通过其Transactions操作,形成了RDD血缘关系图,即DAG,最后通过Action的调用,触发Job并调度执行。DAGScheduler负责Stage级的调度,主要是将job切分成若干Stages,并将每个Stage打包成TaskSet交给TaskScheduler调度。TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不同的资源管理系统。有了上述感性的认识后,下面这张图描述了Spark-On-Yarn模式下在任务调度期间,ApplicationMaster、Driver以及Executor内部模块的交互过程:

 模块交互过程

 Job提交和Task拆分

Driver初始化SparkContext过程中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler。

3.Spark Stage级调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。

 Job提交调用栈

Job由最终的RDD和Action方法封装而成,SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算,如上图紫色流程部分。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据,下面看一个简单的例子WordCount。

 WordCount实例

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程中,RDD-3依赖RDD-2,并且是宽依赖,所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,所以将RDD-0、RDD-1和RDD-2划分到同一个Stage,即ShuffleMapStage中,实际执行的时候,数据记录会一气呵成地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索算法。

一个Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。

4.Spark Task级调度

Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示。

 TaskManager结构

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务

 前面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余量,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大致方法调用流程如下图所示:

 

ask调度流程

        将TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task

​​​​​​​4.1调度策略

前面讲到,TaskScheduler会先把DAGScheduler给过来的TaskSet封装成TaskSetManager扔到任务队列里,然后再从任务队列里按照一定的规则把它们取出来在SchedulerBackend给过来的Executor上运行。这个调度过程实际上还是比较粗粒度的,是面向TaskSetManager的

TaskScheduler是以树的方式来管理任务队列,树中的节点类型为Schdulable,叶子节点为TaskSetManager,非叶子节点为Pool,下图是它们之间的继承关系。

 

任务队列继承关系

TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool,表示树的根节点,是Pool类型。

  1. FIFO调度策略

如果是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保存在一个FIFO队列中。

 

FIFO调度策略内存结构

  1. FAIR调度策略

FAIR调度策略的树结构如下图所示:

FAIR调度策略内存结构

FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。

在FAIR模式中需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetManager都继承了Schedulable特质,因此使用相同的排序算法

排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。

  1. 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行
  2. 如果A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行
  3. 如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行
  4. 如果上述比较均相等,则比较名字。

整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行

FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。

从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

​​​​​​​4.2本地化调度

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用getPreferrdeLocations()得到partition 的优先位置,由于一个partition对应一个task,此partition的优先位置就是task的优先位置对于要提交到TaskScheduler的TaskSet中的每一个task,该task优先位置与其对应的partition对应的优先位置一致

从调度队列中拿到TaskSetManager后,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的所有task,并负责管理调度这些task。

根据每个task的优先位置,确定task的Locality级别,Locality一共有五种,优先级由高到低顺序:

Spark本地化等级

名称

解析

PROCESS_LOCAL

进程本地化,task和数据在同一个Executor中,性能最好。

NODE_LOCAL

节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。

RACK_LOCAL

机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。

NO_PREF

对于task来说,从哪里获取都一样,没有好坏之分。

ANY

task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。

在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。

可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能。

​​​​​​​4.3失败重试与黑名单机制

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败

 模块交互过程

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386415.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql数据库之存储过程

一、存储过程简介。 存储过程是事先经过编译并存储在数据库中的一段sql语句的集合,调用存储过程可以简化应用开发人员的很多工作,减少数据在数据库和应用服务器之间的传输,对于提高数据处理的效率是也有好处的。 存储过程思想上很简单&…

Mysql常见面试题总结

1、什么是存储引擎 存储引擎指定了表的类型,即如何存储和索引数据,是否支持事务,同时存储引擎也决定了表在计算机中的存储方式。 2、查看数据库支持哪些存储引擎使用什么命令? -- 查看数据库支持的存储引擎 show engines; 或者 …

百趣代谢组学分享,关于儿童Graves病相关的新环境物质的鉴定

代谢组学文章标题:Identification of Novel Environmental Substances Relevant to Pediatric Graves’ Disease 发表期刊:Frontiers in endocrinology 影响因子:6.055 作者单位:苏州大学附属儿童医院 百趣提供服务&#xf…

外贸建站多少钱才能达到预期效果?

外贸建站多少钱才能达到预期效果?这是每个外贸企业都会问的问题。作为一个做外贸建站多年的人,我有一些个人的操盘感想。 首先,我认为外贸建站的投资是非常必要的。 因为在现代社会,网站已经成为外贸企业开展业务的必要工具之一…

3种方法删除7-Zip压缩包的密码

7-Zip压缩软件是一款完全免费且开源的软件,不仅能压缩和解压7-Zip压缩包,还能给压缩包设置打开密码。 有些小伙伴可能会遇到这样的问题,7-Zip压缩包设置密码后,过了一段时间不需要密码保护了,或者一不小心忘记了密码&…

后端快速上手前端三剑客 HtmlCSSJavaScript

文章目录前言HTML1.基础标签2.多媒体标签:3.表格&列表&布局4.表单CSS1.简介2.导入方式3.选择器JavaScript1.简介2.引入方式3.基本语法4.对象(1) 基本对象(2) BOM对象(3) DOM对象5.事件前言 结构:HTML 表现:CSS 行为:Java…

D. Linguistics(思维 + 贪心)

Problem - D - Codeforces Alina发现了一种奇怪的语言,它只有4个单词:a, B, AB, BA。事实也证明,在这种语言中没有空格:一个句子是通过将单词连接成一个字符串来写的。Alina发现了一个这样的句子,她很好奇:有没有可能它恰好由a个单词a, b个单…

EasyExcel You can try specifying the ‘excelType‘ yourself 异常排查与处理

目录 问题发现 报错信息 问题排查 1、确定异常 2、查询easyexcel源码读取文件源码 3、查看业务代码 优化方案 1、将路径获取文件流的方式换为httpclient获取 2、dug测试修改代码 总结 问题发现 在测试环境测试导入订单,发现订单导入提示数据导入异常。 …

Python dict字典全部操作方法

文章目录一. 介绍二. 字典的创建1. 手动创建2. 使用内置函数dict()创建3. 使用dict.fromkeys()方法创建三. 字典元素的读取1. 下标方式读取Value2. dict.get()读取Value3. keys()方法返回“键”4. values()方法返回“值”5. items()方法返回“键-值”对四. 字典元素的添加与修改…

【20230227】回溯算法小结

回溯法又叫回溯搜索法,是搜索的一种方式。回溯法本质是穷举所有可能。如果想让回溯法高效一些,可以加一些剪枝操作。回溯算法解决的经典问题:组合问题切割问题子集问题排列问题棋盘问题如何去理解回溯法?回溯法解决的问题都可以抽…

hadoop调优

hadoop调优 1 HDFS核心参数 1.1 NameNode内存生产配置 1.1.1 NameNode内存计算 每个文件块大概占用150byte,如果一台服务器128G,能存储的文件块如下 128 (G)* 1024(MB) * 1024(KB) * 1024(Byte) / 150 Byte 9.1 亿 1.1.2 Hadoop2.x 在Hadoop2.x中…

Linux--多线程(3)

目录1. POSIX信号量1.1 概念2. 基于环形队列的生产消费者模型2.1 环形队列的基本原理2.2 基本实现思想3. 多生产多消费1. POSIX信号量 1.1 概念 信号量本质是一个计数器,申请了信号量以后,可以达到预定临界资源的效果。 POSIX信号量和SystemV信号量相同…

【自动包装线标签打印翻转问题沟通】

最近纺丝自动包装线的标签打印机自动打印标签,是翻转状态。) 但是这个打印机它不是平放的,它是通过悬臂安装在半空的中的,是翻转的, 它的标签一个打在侧面,一个打在正前方,打印出来的样子是这样的。 是反…

全国媒体邀约怎么做?邀请媒体有哪些注意事项呢?

传媒如春雨,润物细无声,大家好好多企业或者机构都在参加外地的展览展会,活动会议,或者由于多种方面的考虑,会在公司总部以外的地方去做活动和发布会,在一个相对陌生的地方,不论是活动准备&#…

WebRTC → 多人通讯架构浅析

1、一对一通信模型一对一通信中,WebRTC会先尝试两个终端之间是否可以通过P2P直接进行通信,无法通信时会通过STUN/TURN服务器进行中转;其中STUN/TURN服务器的作用在不能直连时是中继服务器,通过该服务器进行端到端之间的数据中转&a…

数据挖掘多模块接口(二分类)python旗舰版

数据挖掘任务一般分为四大步骤:1、数据预处理2、特征选择3、模型训练与测试4、模型评估本文为四大步骤提供接口,使得能够快速进行一个数据挖掘多种任务中,常见的二分类任务。0. 导包0.1 忽略警告信息:import warnings warnings.fi…

【Java学习笔记】2.Java 开发环境配置

Java 开发环境配置 在本章节中我们将为大家介绍如何搭建Java开发环境。 window系统安装java 下载JDK 首先我们需要下载 java 开发工具包 JDK,下载地址:https://www.oracle.com/java/technologies/downloads/,在下载页面中根据自己的系统选…

企业寄件现代化管理教程

现代化企业为了跟上时代发展的步伐,在不断完善着管理制度,其中公司寄件管理,也是重要的一个模块。为了提高公司快递的寄件效率,以及节约寄件成本,实现快递寄件的规范化,越来越多的现代化企业,开…

ES linux 环境下安装问题集锦

1: 所有的环境配置安装完成后验证: curl -u elastic http://127.0.0.1:9400 输入密码;2:错误1:解决方法:配置连接的用户名与密码; 重启ES 错误2:[1]: max number of threads [2048] …

前端性能优化:浏览器的2种缓存方式,你了解吗?

在前端性能优化中,最重要的就是缓存,使用缓存可以极大的提升浏览器的响应速率。什么是缓存呢?当我们第一次访问某个网站时,浏览器会把网站中的图片等资源存储在电脑中,以备后续使用,第二次访问该网站时&…