Spark源码分析之二:Job的调度模型与运行反馈

 在《Spark源码分析之Job提交运行总流程概述》一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段:

        1、Job的调度模型与运行反馈;

        2、Stage划分;

        3、Stage提交:对应TaskSet的生成。

        今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈。

Spark源码分析之二:Job的调度模型与运行反馈

        首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行。入口方法为DAGScheduler的runJon()方法。代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. /** 
  2.    * Run an action job on the given RDD and pass all the results to the resultHandler function as 
  3.    * they arrive. 
  4.    * 
  5.    * @param rdd target RDD to run tasks on 
  6.    * @param func a function to run on each partition of the RDD 
  7.    * @param partitions set of partitions to run on; some jobs may not want to compute on all 
  8.    *   partitions of the target RDD, e.g. for operations like first() 
  9.    * @param callSite where in the user program this job was called 
  10.    * @param resultHandler callback to pass each result to 
  11.    * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 
  12.    * 
  13.    * @throws Exception when the job fails 
  14.    */  
  15.   def runJob[T, U](  
  16.       rdd: RDD[T],  
  17.       func: (TaskContext, Iterator[T]) => U,  
  18.       partitions: Seq[Int],  
  19.       callSite: CallSite,  
  20.       resultHandler: (Int, U) => Unit,  
  21.       properties: Properties): Unit = {  
  22.         
  23.     // 开始时间  
  24.     val start = System.nanoTime  
  25.       
  26.     // 调用submitJob()方法,提交Job,返回JobWaiter  
  27.     // rdd为最后一个rdd,即target RDD to run tasks on  
  28.     // func为该rdd上每个分区需要执行的函数,a function to run on each partition of the RDD  
  29.     // partitions为该rdd上需要执行操作的分区集合,set of partitions to run on  
  30.     // callSite为用户程序job被调用的地方,where in the user program this job was called  
  31.     val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)  
  32.       
  33.     // JobWaiter调用awaitResult()方法等待结果  
  34.     waiter.awaitResult() match {  
  35.       case JobSucceeded => // Job运行成功  
  36.         logInfo("Job %d finished: %s, took %f s".format  
  37.           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
  38.       case JobFailed(exception: Exception) =>// Job运行失败  
  39.         logInfo("Job %d failed: %s, took %f s".format  
  40.           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
  41.         // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.  
  42.         val callerStackTrace = Thread.currentThread().getStackTrace.tail  
  43.         exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)  
  44.         throw exception  
  45.     }  
  46.   }  

        runJob()方法就做了三件事:

        首先,获取开始时间,方便最后计算Job执行时间;

        其次,调用submitJob()方法,提交Job,返回JobWaiter类型的对象waiter;

        最后,waiter调用JobWaiter的awaitResult()方法等待Job运行结果,这个运行结果就俩:JobSucceeded代表成功,JobFailed代表失败。

        awaitResult()方法通过轮询标志位_jobFinished,如果为false,则调用this.wait()继续等待,否则说明Job运行完成,返回JobResult,其代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. def awaitResult(): JobResult = synchronized {  
  2.       
  3.     // 循环,如果标志位_jobFinished为false,则一直循环,否则退出,返回JobResult  
  4.     while (!_jobFinished) {  
  5.       this.wait()  
  6.     }  
  7.     return jobResult  
  8.   }  

        而这个标志位_jobFinished是在Task运行完成后,如果已完成Task数目等于总Task数目时,或者整个Job运行失败时设置的,随着标志位的设置,Job运行结果jobResult也同步进行设置,代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. // 任务运行完成  
  2.   override def taskSucceeded(index: Int, result: Any): Unit = synchronized {  
  3.     if (_jobFinished) {  
  4.       throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")  
  5.     }  
  6.     resultHandler(index, result.asInstanceOf[T])  
  7.     finishedTasks += 1  
  8.     // 已完成Task数目是否等于总Task数目  
  9.     if (finishedTasks == totalTasks) {  
  10.       // 设置标志位_jobFinished为ture  
  11.       _jobFinished = true  
  12.       // 作业运行结果为成功  
  13.       jobResult = JobSucceeded  
  14.       this.notifyAll()  
  15.     }  
  16.   }  
  17.   
  18.   // 作业失败  
  19.   override def jobFailed(exception: Exception): Unit = synchronized {  
  20.     // 设置标志位_jobFinished为ture  
  21.     _jobFinished = true  
  22.     // 作业运行结果为失败  
  23.     jobResult = JobFailed(exception)  
  24.     this.notifyAll()  
  25.   }  



        接下来,看看submitJob()方法,代码定义如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. /** 
  2.    * Submit an action job to the scheduler. 
  3.    * 
  4.    * @param rdd target RDD to run tasks on 
  5.    * @param func a function to run on each partition of the RDD 
  6.    * @param partitions set of partitions to run on; some jobs may not want to compute on all 
  7.    *   partitions of the target RDD, e.g. for operations like first() 
  8.    * @param callSite where in the user program this job was called 
  9.    * @param resultHandler callback to pass each result to 
  10.    * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name 
  11.    * 
  12.    * @return a JobWaiter object that can be used to block until the job finishes executing 
  13.    *         or can be used to cancel the job. 
  14.    * 
  15.    * @throws IllegalArgumentException when partitions ids are illegal 
  16.    */  
  17.   def submitJob[T, U](  
  18.       rdd: RDD[T],  
  19.       func: (TaskContext, Iterator[T]) => U,  
  20.       partitions: Seq[Int],  
  21.       callSite: CallSite,  
  22.       resultHandler: (Int, U) => Unit,  
  23.       properties: Properties): JobWaiter[U] = {  
  24.       
  25.     // Check to make sure we are not launching a task on a partition that does not exist.  
  26.     // 检测rdd分区以确保我们不会在一个不存在的partition上launch一个task  
  27.     val maxPartitions = rdd.partitions.length  
  28.     partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>  
  29.       throw new IllegalArgumentException(  
  30.         "Attempting to access a non-existent partition: " + p + ". " +  
  31.           "Total number of partitions: " + maxPartitions)  
  32.     }  
  33.   
  34.     // 为Job生成一个jobId,jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增  
  35.     val jobId = nextJobId.getAndIncrement()  
  36.       
  37.     // 如果partitions大小为0,即没有需要执行任务的分区,快速返回  
  38.     if (partitions.size == 0) {  
  39.       // Return immediately if the job is running 0 tasks  
  40.       return new JobWaiter[U](this, jobId, 0, resultHandler)  
  41.     }  
  42.   
  43.     assert(partitions.size > 0)  
  44.       
  45.     // func转化下,否则JobSubmitted无法接受这个func参数,T转变为_  
  46.     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  
  47.       
  48.     // 创建一个JobWaiter对象  
  49.     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)  
  50.       
  51.     // eventProcessLoop加入一个JobSubmitted事件到事件队列中  
  52.     eventProcessLoop.post(JobSubmitted(  
  53.       jobId, rdd, func2, partitions.toArray, callSite, waiter,  
  54.       SerializationUtils.clone(properties)))  
  55.       
  56.     // 返回JobWaiter  
  57.     waiter  
  58.   }  

        submitJob()方法一共做了5件事情:

        第一,数据检测,检测rdd分区以确保我们不会在一个不存在的partition上launch一个task,并且,如果partitions大小为0,即没有需要执行任务的分区,快速返回;

        第二,为Job生成一个jobId,该jobId为AtomicInteger类型,getAndIncrement()确保了原子操作性,每次生成后都自增;

        第三,将func转化下,否则JobSubmitted无法接受这个func参数,T转变为_;

        第四,创建一个JobWaiter对象waiter,该对象会在方法结束时返回给上层方法,以用来监测Job运行结果;

        第五,将一个JobSubmitted事件加入到事件队列eventProcessLoop中,等待工作线程轮询调度(速度很快)。

        这里,我们有必要研究下事件队列eventProcessLoop,eventProcessLoop为DAGSchedulerEventProcessLoop类型的,在DAGScheduler初始化时被定义并赋值,代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. // 创建DAGSchedulerEventProcessLoop类型的成员变量eventProcessLoop  
  2.   private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)  

        DAGSchedulerEventProcessLoop继承自EventLoop,我们先来看看这个EventLoop的定义。

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. /** 
  2.  * An event loop to receive events from the caller and process all events in the event thread. It 
  3.  * will start an exclusive event thread to process all events. 
  4.  * EventLoop用来接收来自调用者的事件并在event thread中除了所有的事件。它将开启一个专门的事件处理线程处理所有的事件。 
  5.  * 
  6.  * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can 
  7.  * handle events in time to avoid the potential OOM. 
  8.  */  
  9. private[spark] abstract class EventLoop[E](name: String) extends Logging {  
  10.     
  11.   // LinkedBlockingDeque类型的事件队列,队列元素为E类型  
  12.   private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  
  13.   
  14.   // 标志位  
  15.   private val stopped = new AtomicBoolean(false)  
  16.   
  17.   // 事件处理线程  
  18.   private val eventThread = new Thread(name) {  
  19.     // 设置为后台线程  
  20.     setDaemon(true)  
  21.   
  22.     override def run(): Unit = {  
  23.       try {  
  24.         // 如果标志位stopped没有被设置为true,一直循环  
  25.         while (!stopped.get) {  
  26.           // 从事件队列中take一条事件  
  27.           val event = eventQueue.take()  
  28.           try {  
  29.             // 调用onReceive()方法进行处理  
  30.             onReceive(event)  
  31.           } catch {  
  32.             case NonFatal(e) => {  
  33.               try {  
  34.                 onError(e)  
  35.               } catch {  
  36.                 case NonFatal(e) => logError("Unexpected error in " + name, e)  
  37.               }  
  38.             }  
  39.           }  
  40.         }  
  41.       } catch {  
  42.         case ie: InterruptedException => // exit even if eventQueue is not empty  
  43.         case NonFatal(e) => logError("Unexpected error in " + name, e)  
  44.       }  
  45.     }  
  46.   
  47.   }  
  48.   
  49.   def start(): Unit = {  
  50.     if (stopped.get) {  
  51.       throw new IllegalStateException(name + " has already been stopped")  
  52.     }  
  53.     // Call onStart before starting the event thread to make sure it happens before onReceive  
  54.     onStart()  
  55.     eventThread.start()  
  56.   }  
  57.   
  58.   def stop(): Unit = {  
  59.     if (stopped.compareAndSet(false, true)) {  
  60.       eventThread.interrupt()  
  61.       var onStopCalled = false  
  62.       try {  
  63.         eventThread.join()  
  64.         // Call onStop after the event thread exits to make sure onReceive happens before onStop  
  65.         onStopCalled = true  
  66.         onStop()  
  67.       } catch {  
  68.         case ie: InterruptedException =>  
  69.           Thread.currentThread().interrupt()  
  70.           if (!onStopCalled) {  
  71.             // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since  
  72.             // it's already called.  
  73.             onStop()  
  74.           }  
  75.       }  
  76.     } else {  
  77.       // Keep quiet to allow calling `stop` multiple times.  
  78.     }  
  79.   }  
  80.   
  81.   /** 
  82.    * Put the event into the event queue. The event thread will process it later. 
  83.    * 将事件加入到时间队列。事件线程过会会处理它。 
  84.    */  
  85.   def post(event: E): Unit = {  
  86.     // 将事件加入到待处理队列  
  87.     eventQueue.put(event)  
  88.   }  
  89.   
  90.   /** 
  91.    * Return if the event thread has already been started but not yet stopped. 
  92.    */  
  93.   def isActive: Boolean = eventThread.isAlive  
  94.   
  95.   /** 
  96.    * Invoked when `start()` is called but before the event thread starts. 
  97.    */  
  98.   protected def onStart(): Unit = {}  
  99.   
  100.   /** 
  101.    * Invoked when `stop()` is called and the event thread exits. 
  102.    */  
  103.   protected def onStop(): Unit = {}  
  104.   
  105.   /** 
  106.    * Invoked in the event thread when polling events from the event queue. 
  107.    * 
  108.    * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked 
  109.    * and cannot process events in time. If you want to call some blocking actions, run them in 
  110.    * another thread. 
  111.    */  
  112.   protected def onReceive(event: E): Unit  
  113.   
  114.   /** 
  115.    * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError` 
  116.    * will be ignored. 
  117.    */  
  118.   protected def onError(e: Throwable): Unit  
  119.   
  120. }  

        我们可以看到,EventLoop实际上就是一个任务队列及其对该队列一系列操作的封装。在它内部,首先定义了一个LinkedBlockingDeque类型的事件队列,队列元素为E类型,其中DAGSchedulerEventProcessLoop存储的则是DAGSchedulerEvent类型的事件,代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. // LinkedBlockingDeque类型的事件队列,队列元素为E类型  
  2.   private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  

        并提供了一个后台线程,专门对事件队列里的事件进行监控,并调用onReceive()方法进行处理,代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. // 事件处理线程  
  2.   private val eventThread = new Thread(name) {  
  3.     // 设置为后台线程  
  4.     setDaemon(true)  
  5.   
  6.     override def run(): Unit = {  
  7.       try {  
  8.         // 如果标志位stopped没有被设置为true,一直循环  
  9.         while (!stopped.get) {  
  10.           // 从事件队列中take一条事件  
  11.           val event = eventQueue.take()  
  12.           try {  
  13.             // 调用onReceive()方法进行处理  
  14.             onReceive(event)  
  15.           } catch {  
  16.             case NonFatal(e) => {  
  17.               try {  
  18.                 onError(e)  
  19.               } catch {  
  20.                 case NonFatal(e) => logError("Unexpected error in " + name, e)  
  21.               }  
  22.             }  
  23.           }  
  24.         }  
  25.       } catch {  
  26.         case ie: InterruptedException => // exit even if eventQueue is not empty  
  27.         case NonFatal(e) => logError("Unexpected error in " + name, e)  
  28.       }  
  29.     }  
  30.   
  31.   }  

        那么如何向队列中添加事件呢?调用其post()方法,传入事件即可。如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. /** 
  2.    * Put the event into the event queue. The event thread will process it later. 
  3.    * 将事件加入到时间队列。事件线程过会会处理它。 
  4.    */  
  5.   def post(event: E): Unit = {  
  6.     // 将事件加入到待处理队列  
  7.     eventQueue.put(event)  
  8.   }  

        言归正传,上面提到,submitJob()方法利用eventProcessLoop的post()方法加入一个JobSubmitted事件到事件队列中,那么DAGSchedulerEventProcessLoop对于JobSubmitted事件是如何处理的呢?我们看它的onReceive()方法,源码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. /** 
  2.    * The main event loop of the DAG scheduler. 
  3.    * DAGScheduler中事件主循环 
  4.    */  
  5.   override def onReceive(event: DAGSchedulerEvent): Unit = {  
  6.     val timerContext = timer.time()  
  7.     try {  
  8.       // 调用doOnReceive()方法,将DAGSchedulerEvent类型的event传递进去  
  9.       doOnReceive(event)  
  10.     } finally {  
  11.       timerContext.stop()  
  12.     }  
  13.   }  

        继续看doOnReceive()方法,代码如下:

[java] view plain copy
 
 Spark源码分析之二:Job的调度模型与运行反馈Spark源码分析之二:Job的调度模型与运行反馈
  1. // 事件处理调度函数  
  2.   private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {  
  3.       
  4.     // 如果是JobSubmitted事件,调用dagScheduler.handleJobSubmitted()方法处理  
  5.     case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>  
  6.       dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)  
  7.   
  8.     // 如果是MapStageSubmitted事件,调用dagScheduler.handleMapStageSubmitted()方法处理  
  9.     case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>  
  10.       dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)  
  11.   
  12.     case StageCancelled(stageId) =>  
  13.       dagScheduler.handleStageCancellation(stageId)  
  14.   
  15.     case JobCancelled(jobId) =>  
  16.       dagScheduler.handleJobCancellation(jobId)  
  17.   
  18.     case JobGroupCancelled(groupId) =>  
  19.       dagScheduler.handleJobGroupCancelled(groupId)  
  20.   
  21.     case AllJobsCancelled =>  
  22.       dagScheduler.doCancelAllJobs()  
  23.   
  24.     case ExecutorAdded(execId, host) =>  
  25.       dagScheduler.handleExecutorAdded(execId, host)  
  26.   
  27.     case ExecutorLost(execId) =>  
  28.       dagScheduler.handleExecutorLost(execId, fetchFailed = false)  
  29.   
  30.     case BeginEvent(task, taskInfo) =>  
  31.       dagScheduler.handleBeginEvent(task, taskInfo)  
  32.   
  33.     case GettingResultEvent(taskInfo) =>  
  34.       dagScheduler.handleGetTaskResult(taskInfo)  
  35.   
  36.     case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>  
  37.       dagScheduler.handleTaskCompletion(completion)  
  38.   
  39.     case TaskSetFailed(taskSet, reason, exception) =>  
  40.       dagScheduler.handleTaskSetFailed(taskSet, reason, exception)  
  41.   
  42.     case ResubmitFailedStages =>  
  43.       dagScheduler.resubmitFailedStages()  
  44.   }  

        对于JobSubmitted事件,我们通过调用DAGScheduler的handleJobSubmitted()方法来处理。

        好了,到这里,第一阶段Job的调度模型与运行反馈大体已经分析完了,至于后面的第二、第三阶段,留待后续博文继续分析吧~

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50667966