【Spark三十四】Standalone集群+Cluster部署模式下用户交付任务的执行流程

【Spark三十四】Standalone集群+Cluster部署模式下用户提交任务的执行流程

standalone-Cluster模式下application提交到执行的流程

  • SparkSubmit提交程序
  • 通过sparkSubmit命令提交执行SparkSubmit的main函数,在main函数中执行launch函数
  • 因为standalone-Cluster模式,master以spark开头并且deployMode为cluster,因此,mainClass是deploy.client, 也就是最终的逻辑进入到deploy.client的main函数
  • 在deploy.Client的main函数中创建ClientActor,
  • ClientActor执行它的preStart方法,主要工作是封装Driver信息,给Master发送RequestSubmitDriver请求
  • 在Master中处理RequestSubmitDriver请求,是创建Driver和调用schedule方法
  • 在Master的schedule中的launchDriver方法中,给Worker发送LauchDriver请求
  • 在Worker中处理LaunchDriver请求,创建DriverRunner,调用DriverRunner.start方法
  • 在DriverRunner的start方法中,调用DriverRunner的launchDriver
  • 在launchDriver中调用runCommandWithRetry创建Driver进程

下面对创建Driver进程(JVM进程)的代码梳理一下:

1.

private[deploy] def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => Unit,
    supervise: Boolean) {
    // Time to wait between submission retries.
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5

    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        if (killed) { return }
        process = Some(command.start()) ///command的start方法返回Java的Process对象
        initialize(process.get)
      }

      val processStart = clock.currentTimeMillis()
      val exitCode = process.get.waitFor()
      if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) {
        waitSeconds = 1
      }

      if (supervise && exitCode != 0 && !killed) {
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }

      keepTrying = supervise && exitCode != 0 && !killed
      finalExitCode = Some(exitCode)
    }
  }

 

2. 传入的command是ProcessBuilderLike类型的对象

private[deploy] object ProcessBuilderLike {
  def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike {
    def start() = processBuilder.start() ///调用Java的ProcessBuilder的start方法
    def command = processBuilder.command()
  }
}

 

3. 传入的commandProcessbuilderLike是基于ProcessBuilder而构建,command = new ProcessBuilderLike(ProcessBuilder)

          val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
            sparkHome.getAbsolutePath, substituteVariables, Seq(localJarFilename)) ///返回ProcessBuilder对象

 

4.CommandUtils.buildProcessBuilder的代码如下所示:

  def buildProcessBuilder(
      command: Command,
      memory: Int,
      sparkHome: String,
      substituteArguments: String => String,
      classPaths: Seq[String] = Seq[String](),
      env: Map[String, String] = sys.env): ProcessBuilder = {
    val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
    val commandSeq = buildCommandSeq(localCommand, memory, sparkHome) ///构造【${JAVA_HOME}/bin/java options mainClass 参数】 命令
    val builder = new ProcessBuilder(commandSeq: _*)
    val environment = builder.environment()
    for ((key, value) <- localCommand.environment) {
      environment.put(key, value)
    }
    builder
  }

 

5. buildLocalCommand方法

读取Java进程运行所在的机器的环境信息,比如系统变量等

  /**
   * Build a command based on the given one, taking into account the local environment
   * of where this command is expected to run, substitute any placeholders, and append
   * any extra class paths.
   */
  private def buildLocalCommand(
      command: Command,
      substituteArguments: String => String,
      classPath: Seq[String] = Seq[String](),
      env: Map[String, String]): Command = {
    val libraryPathName = Utils.libraryPathEnvName
    val libraryPathEntries = command.libraryPathEntries
    val cmdLibraryPath = command.environment.get(libraryPathName)

    val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
      val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
      command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
    } else {
      command.environment
    }

 

 

6. buildCommandSeq方法

  private def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
    val runner = sys.env.get("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")

    // SPARK-698: do not call the run.cmd script, as process.destroy()
    // fails to kill a process tree on Windows
    
    ///构造【${JAVA_HOME}/bin/java options mainClass 参数】 命令  
    Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++
    command.arguments
  }

 

  • Driver进程启动后,即运行我们定义的application的main函数
  • 创建SparkContext对象时,创建1.   private[spark] var (schedulerBackend, taskScheduler) =   SparkContext.createTaskScheduler(this, master) 2. 创建 dagScheduler = new DAGScheduler(this)
  • 在createTaskScheduler方法中,创建SparkDeploySchedulerBackend对象,SparkDeploySchedulerBackend继承自SparkDeploySchedulerBackend
  • 在SparkContext的构造方法中,调用TaskScheduler的start方法,在start方法内部调用SparkDeploySchedulerBackend的start方法
  • 在SparkDeploySchedulerBackend的start方法中,构造AppClient对象,并调用AppClient的start方法
  • 在AppClient中,执行preStart方法以调用 registerWithMaster()方法,将Driver注册给Master,实际上此时给Master发送的是RegisterApplication消息,
  • 在Master的RegisterApplication消息处理中,调用Master的schedule方法
  • 在Master的schedule方法中调用launchExecutor方法
  • 在Master的launchExecutor方法中,给Worker发送LaunchExecutor消息
  • 在Workder的LaunchExecutor消息处理器中,创建ExecutorRunner对象,而ExecutorRunner则通过反射的方式创建一个Java进程,这个进程就是启动一个CoarseGrainedExecutorBackend进程
  • 调用ExecutorRunner对象的start方法,start方法调用fetchAndRunExecutor方法
  • 如下是fetchAndRunExecutor方法的一部分逻辑

 

      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
        sparkHome.getAbsolutePath, substituteVariables)

 问题:appDesc.command是在哪里定义的?这是在SparkDeploySchedulerBackend的start方法定义的

 

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ///封装到ApplicationDescription中,后面会启动一个进程
      args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir)

    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

 

 

  • CoarseGrainedExecutorBackend是一个Actor,首先运行它的preStart方法,在它的preStart方法中,给Driver发送RegisterExecutor消息
  • 此处的Driver是在CoarseGrainedSchedulerBackend中定义的,当它收到RegisterExecutor时,调用CoarseGrainedSchedulerBackend的makeOffers方法
  • 在makeOffers中,调用launchTasks方法启动任务
  • 在launchTasks中,循环提交所有的Task(这本来是一个TaskSet任务集),每次循环给CoarseGrainedExecutorBackend发送LaunchTask消息
  • CoarseGrainedExecutorBackend处理LaunchTask时,调用Executor的launchTask方法
  • 在Executor的launchTask方法中,提交给Executor中的线程池执行
  def launchTask(
      context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, taskName, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

 

 

 

总结

如下图是网上流传甚广的一幅图片,经常上面的流程分析,可知这幅图是错的,第二步不是RegisterMaster,第五步才是RegisterDriver(给Master发送的消息是RegisterApplication)

 
【Spark三十四】Standalone集群+Cluster部署模式下用户交付任务的执行流程