Spark2.4.0源码——TaskScheduler
分类:
IT文章
•
2022-08-28 22:36:15
概述
TaskScheduler定义了对任务进行调度的接口规范,目前spark只有taskSchedulerImpl一个实现类,用于接收DAGScheduler发送的taskSets,并按照资源调度算法将资源分配给task并提交task到executor上执行。
TaskSchedulerImpl通过taskSetManager来实现任务的推测执行和task本地性分配,任务推测执行就是当发现有运行较慢的task时,将该task发送到其他executor上执行,采用最先完成的执行结果,减少运行较慢的task对整个任务进度的影响;task本地性算法可以将task发送到与该task将要处理的数据所在节点最近的executor,减少网络数据传输,spark目前支持5种本地性级别:PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NO_PREF(没有偏好)、RACK_LOCAL(本地机架)、ANY(任何)。
TaskSchedulerImpl还依赖一个后端接口SchedulerBackend,给task分配资源实际上是由这个后端接口完成的。
TaskSchedulerImpl的初始化和启动
spark程序创建的sparkContext内部会创建TaskSchedulerImpl并调用TaskSchedulerImpl的initialize和start方法。
taskSchedulerImpl的属性之一,在创建taskSchedulerImpl的时候会创建根调度池,taskSchedulerImpl对task的调度依赖调度池Pool,需要被调度的task都会被放入调度池中,调度池pool根据调度算法(FIFO/FAIR)对taskSet调度,并将被调度的taskSet交给taskSchedulerImpl进行资源调度。
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
initialize和start方法
def initialize(backend: SchedulerBackend) {
//将sparkContext内创建的SchedulerBackend赋给backend属性
this.backend = backend
//根据不同的schedulingMode创建不同的调度池构建器
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
//构建调度池
schedulableBuilder.buildPools()
}
override def start() {
//启动schedulerBackend
backend.start()
//如果应用不是在local模式且开启了推测执行,设置一个执行间隔为SPECULATION_INTERVAL_MS(100ms)的检查可推测执行任务的定时器
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
//检查可推测执行任务
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
View Code
Task提交
DAGScheduler在调用submitMissingTasks提交taskSet时,内部调用taskScheduler.submitTask方法,实现如下:
override def submitTasks(taskSet: TaskSet) {
//获取task
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
////对当前TaskSet进行冲突检查,taskSetsByStageIdAndAttempt中不该有同属于当前stage但TaskSet却不同的情况
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${
stageTaskSets.toSeq.map {
_._2.taskSet.id
}.mkString(",")
}")
}
//将刚创建的taskSetManager添加到调度池构建器创建的调度池中
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//如果程序不是local模式且还未接收到task,就设置一个定时器按照STARVATION_TIMEOUT_MS指定的间隔检查taskSchedulerImpl的饥饿状况
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
//表示taskSchedulerImpl已经接收到task
hasReceivedTask = true
}
//给task分配资源并运行task
backend.reviveOffers()
}
View Code
reviveOffers()方法是SchedulerBackend用于给task分配资源并运行task,这个后端接口并不由taskSchedulerImpl创建,而是在sparkContext创建时根据提交模式创建的SchedulerBackend的不同实现类传递给TaskSchedulerImpl的。
以local模式为例,localSchedulerBackend的reviveoffers方法实际上会向LocalEndpoint发送reviveOffers事件,LocalEndpoint再调用自己的reviveOffers方法,内部再调用TaskSchedulerImpl的resourceOffer方法给task分配资源,最后调用Executor.launchTask加载并尝试执行task,实现如下:
def reviveOffers() {
//创建包含一个workerOffer样例类的序列,workerOffer的localExecutorId为driver、localExecutorHostname为localhost、cores为1
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
Some(rpcEnv.address.hostPort)))
//调用taskSchedulerImpl.resourceOffers给task分配资源
for (task <- scheduler.resourceOffers(offers).flatten) {
//将空闲的CPU内核数-1
freeCores -= scheduler.CPUS_PER_TASK
//调用executor.launchTask方法加载task并执行
executor.launchTask(executorBackend, task)
}
}
}
View Code
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
//获取黑名单
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
//如果taskSetManager不是僵尸状态且要分配task的host和executor不是黑名单,执行以下操作
if (!isZombie && !offerBlacklisted) {
//获取执行时间和最大本地性
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
//计算允许的最大本地性级别,如果最大本地性级别是NO_PREF,则允许的最大本地性级别为NO_PREF
//否则最大本地性级别为最大本地性级别maxLocality和getAllowedLocalityLevel获取的本地级别中的最小值
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
//调用dequeueTask方法根据指定的host,executor和本地性级别,找出要执行的task的索引、相应的本地性和是否推断执行,返回三元组
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
//根据要执行的task的索引找到task
val task = tasks(index)
//为task生成新的身份标识
val taskId = sched.newTaskId()
// Do various bookkeeping
//增加复制运行数
copiesRunning(index) += 1
//获取任务尝试号
val attemptNum = taskAttempts(index).size
//创建task尝试信息
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
//更新task与task尝试信息的映射关系
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
//如果最大本地性级别不是NO_PREF
if (maxLocality != TaskLocality.NO_PREF) {
//获取任务的本地性级别并将最后运行时间设置为当前系统时间
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
//序列化task
val serializedTask: ByteBuffer = try {
ser.serialize(task)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
//task大小检查
if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
//向runningTaskSet中添加task的身份标识,并增加调度池中记录的当前运行中任务的数量
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
//生成task名称
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
/**
* def taskStarted(task: Task[_], taskInfo: TaskInfo) {
* eventProcessLoop.post(BeginEvent(task, taskInfo))
* }
* 向DAGSchedulerEventProcessLoop投递BeginEvent事件
*/
sched.dagScheduler.taskStarted(task, info)
//创建并返回taskDescription对象
new TaskDescription(
taskId,
attemptNum,
execId,
taskName,
index,
task.partitionId,
addedFiles,
addedJars,
task.localProperties,
serializedTask)
}
} else {
None
}
}