Spark2.4.0源码——TaskScheduler

  概述

  TaskScheduler定义了对任务进行调度的接口规范,目前spark只有taskSchedulerImpl一个实现类,用于接收DAGScheduler发送的taskSets,并按照资源调度算法将资源分配给task并提交task到executor上执行。

  TaskSchedulerImpl通过taskSetManager来实现任务的推测执行和task本地性分配,任务推测执行就是当发现有运行较慢的task时,将该task发送到其他executor上执行,采用最先完成的执行结果,减少运行较慢的task对整个任务进度的影响;task本地性算法可以将task发送到与该task将要处理的数据所在节点最近的executor,减少网络数据传输,spark目前支持5种本地性级别:PROCESS_LOCAL(本地进程)、NODE_LOCAL(本地节点)、NO_PREF(没有偏好)、RACK_LOCAL(本地机架)、ANY(任何)。

  TaskSchedulerImpl还依赖一个后端接口SchedulerBackend,给task分配资源实际上是由这个后端接口完成的。

  TaskSchedulerImpl的初始化和启动

  spark程序创建的sparkContext内部会创建TaskSchedulerImpl并调用TaskSchedulerImpl的initialize和start方法。

  taskSchedulerImpl的属性之一,在创建taskSchedulerImpl的时候会创建根调度池,taskSchedulerImpl对task的调度依赖调度池Pool,需要被调度的task都会被放入调度池中,调度池pool根据调度算法(FIFO/FAIR)对taskSet调度,并将被调度的taskSet交给taskSchedulerImpl进行资源调度。

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

   initialize和start方法

def initialize(backend: SchedulerBackend) {
    //将sparkContext内创建的SchedulerBackend赋给backend属性
    this.backend = backend
    //根据不同的schedulingMode创建不同的调度池构建器
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
            s"$schedulingMode")
      }
    }
    //构建调度池
    schedulableBuilder.buildPools()
  }
  
override def start() {
    //启动schedulerBackend
    backend.start()
    //如果应用不是在local模式且开启了推测执行,设置一个执行间隔为SPECULATION_INTERVAL_MS(100ms)的检查可推测执行任务的定时器
    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          //检查可推测执行任务
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
View Code

  Task提交

  DAGScheduler在调用submitMissingTasks提交taskSet时,内部调用taskScheduler.submitTask方法,实现如下:

override def submitTasks(taskSet: TaskSet) {
    //获取task
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      ////对当前TaskSet进行冲突检查,taskSetsByStageIdAndAttempt中不该有同属于当前stage但TaskSet却不同的情况
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${
            stageTaskSets.toSeq.map {
              _._2.taskSet.id
            }.mkString(",")
          }")
      }
      
      //将刚创建的taskSetManager添加到调度池构建器创建的调度池中
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      
      //如果程序不是local模式且还未接收到task,就设置一个定时器按照STARVATION_TIMEOUT_MS指定的间隔检查taskSchedulerImpl的饥饿状况
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      //表示taskSchedulerImpl已经接收到task
      hasReceivedTask = true
    }
    //给task分配资源并运行task
    backend.reviveOffers()
  }
View Code

  reviveOffers()方法是SchedulerBackend用于给task分配资源并运行task,这个后端接口并不由taskSchedulerImpl创建,而是在sparkContext创建时根据提交模式创建的SchedulerBackend的不同实现类传递给TaskSchedulerImpl的。

  以local模式为例,localSchedulerBackend的reviveoffers方法实际上会向LocalEndpoint发送reviveOffers事件,LocalEndpoint再调用自己的reviveOffers方法,内部再调用TaskSchedulerImpl的resourceOffer方法给task分配资源,最后调用Executor.launchTask加载并尝试执行task,实现如下:

def reviveOffers() {
    //创建包含一个workerOffer样例类的序列,workerOffer的localExecutorId为driver、localExecutorHostname为localhost、cores为1
    val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
      Some(rpcEnv.address.hostPort)))
    //调用taskSchedulerImpl.resourceOffers给task分配资源
    for (task <- scheduler.resourceOffers(offers).flatten) {
      //将空闲的CPU内核数-1
      freeCores -= scheduler.CPUS_PER_TASK
      //调用executor.launchTask方法加载task并执行
      executor.launchTask(executorBackend, task)
    }
  }
}
View Code

  资源分配

  TaskSchedulerImpl拿到包含WorkerOffers样例类的序列后会进行预处理,如更新host与executor、机架的映射关系用于task数据本地性的计算,workeroffers的随机shuffle保证任务均匀分配在worker节点,统计worker可用资源等,随后调用自己的ResourceOfferSingleTaskSet方法给TaskSet提供资源;

  ResourceOfferSingleTaskSet方法会获取WorkerOffers内的信息,如executor的身份标识、workerOffer的host,再对workerOffer的CPU检查,如果可用CPU大于task所需的CPU数,则执行以下操作:

  1、调用TaskSetManager的resourceOffer方法给待处理的task按照最大本地性创建TaskDescription

  2、将TaskDescription添加到tasks数组

  3、更新task的身份标识与taskSet、Executor的映射关系缓存

  4、CPU重计算,WorkerOffer的可用CPU减去task所用的CPU数,最后返回launchedTask,即是否给taskSet中的task分配了资源

  TaskSetManager.resourceOffer方法实现:

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    //获取黑名单
    val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
      blacklist.isNodeBlacklistedForTaskSet(host) ||
        blacklist.isExecutorBlacklistedForTaskSet(execId)
    }
    //如果taskSetManager不是僵尸状态且要分配task的host和executor不是黑名单,执行以下操作
    if (!isZombie && !offerBlacklisted) {
      //获取执行时间和最大本地性
      val curTime = clock.getTimeMillis()
      var allowedLocality = maxLocality

      //计算允许的最大本地性级别,如果最大本地性级别是NO_PREF,则允许的最大本地性级别为NO_PREF
      //否则最大本地性级别为最大本地性级别maxLocality和getAllowedLocalityLevel获取的本地级别中的最小值
      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      //调用dequeueTask方法根据指定的host,executor和本地性级别,找出要执行的task的索引、相应的本地性和是否推断执行,返回三元组
      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
        // Found a task; do some bookkeeping and return a task description
        //根据要执行的task的索引找到task
        val task = tasks(index)
        //为task生成新的身份标识
        val taskId = sched.newTaskId()
        // Do various bookkeeping
        //增加复制运行数
        copiesRunning(index) += 1
        //获取任务尝试号
        val attemptNum = taskAttempts(index).size
        //创建task尝试信息
        val info = new TaskInfo(taskId, index, attemptNum, curTime,
          execId, host, taskLocality, speculative)
        //更新task与task尝试信息的映射关系
        taskInfos(taskId) = info
        taskAttempts(index) = info :: taskAttempts(index)
        // Update our locality level for delay scheduling
        // NO_PREF will not affect the variables related to delay scheduling
        //如果最大本地性级别不是NO_PREF
        if (maxLocality != TaskLocality.NO_PREF) {
          //获取任务的本地性级别并将最后运行时间设置为当前系统时间
          currentLocalityIndex = getLocalityIndex(taskLocality)
          lastLaunchTime = curTime
        }
        // Serialize and return the task
        //序列化task
        val serializedTask: ByteBuffer = try {
          ser.serialize(task)
        } catch {
          // If the task cannot be serialized, then there's no point to re-attempt the task,
          // as it will always fail. So just abort the whole task-set.
          case NonFatal(e) =>
            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
            logError(msg, e)
            abort(s"$msg Exception during serialization: $e")
            throw new TaskNotSerializableException(e)
        }
        
        //task大小检查
        if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
          !emittedTaskSizeWarning) {
          emittedTaskSizeWarning = true
          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
            s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
        }
        
        //向runningTaskSet中添加task的身份标识,并增加调度池中记录的当前运行中任务的数量
        addRunningTask(taskId)

        // We used to log the time it takes to serialize the task, but task size is already
        // a good proxy to task serialization time.
        // val timeTaken = clock.getTime() - startTime
        //生成task名称
        val taskName = s"task ${info.id} in stage ${taskSet.id}"
        logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
          s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")
  
        /**
          * def taskStarted(task: Task[_], taskInfo: TaskInfo) {
          *     eventProcessLoop.post(BeginEvent(task, taskInfo))
          * }
          * 向DAGSchedulerEventProcessLoop投递BeginEvent事件
          */
        sched.dagScheduler.taskStarted(task, info)
        //创建并返回taskDescription对象
        new TaskDescription(
          taskId,
          attemptNum,
          execId,
          taskName,
          index,
          task.partitionId,
          addedFiles,
          addedJars,
          task.localProperties,
          serializedTask)
      }
    } else {
      None
    }
  }
View Code
 

相关推荐