Spark源码解读(一)——Master启动过程
本文将讨论Spark以Standalone模式部署情况下,Master的启动过程。
1,启动脚本分析
Master的启动过程从start-master.sh脚本开始
首先,做了一些加载配置和环境变量的工作
. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"随后,调用了spark-daemon.sh,注意这里指定了启动的类
CLASS="org.apache.spark.deploy.master.Master"
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ $ORIGINAL_ARGS
spark-daemon.sh又调用了spark-class
nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &补全参数:
nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080 >> "$log" 2>&1 < /dev/null &最终的启动命令:
/Library/Java/JavaVirtualMachines/jdk1.8.0_40.jdk/Contents/Home/jre/bin/java -cp /Users/didi/spark_project/spark-1.6.2-bin-hadoop2.6/conf/:/Users/didi/spark_project/spark-1.6.2-bin-hadoop2.6/lib/spark-assembly-1.6.2-hadoop2.6.0.jar:/Users/didi/spark_project/spark-1.6.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/Users/didi/spark_project/spark-1.6.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/Users/didi/spark_project/spark-1.6.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Xms1g -Xmx1g org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080
2,启动过程代码分析
2.1 分析Netty Server的启动,RequestMessage如何从Netty传递给Master处理以及Master的onStart()方法是如何被调用的
Master的启动从伴生对象的main方法开始
def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() }MasterArguments主要做参数的解析,下面主要看看startRpcEnvAndEndpoint方法
def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }这里需要重点关注RpcEnv.create()和rpcEnv.setupEndpoint()这两个方法,首先看看RpcEnv.create做了些什么
def create( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean = false): RpcEnv = { // Using Reflection to create the RpcEnv to avoid to depend on Akka directly val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode) getRpcEnvFactory(conf).create(config) }RpcEnvFacotry一共有两个实现,1.6默认为NettyRpcEnvFactory
下面查看其create方法:
def create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv }Utils.startServiceOnPort()会调用nettyEnv.startServer(actualPort)方法启动Server
def startServer(port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) } else { java.util.Collections.emptyList() } server = transportContext.createServer(host, port, bootstraps) dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }transportContext.createServer()方法最终会启动Master
public TransportServer createServer( String host, int port, List<TransportServerBootstrap> bootstraps) { return new TransportServer(this, host, port, rpcHandler, bootstraps); }TransportServer会在构造中调用init()方法
private void init(String hostToBind, int portToBind) { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); EventLoopGroup workerGroup = bossGroup; PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); if (conf.backLog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); } if (conf.receiveBuf() > 0) { bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } if (conf.sendBuf() > 0) { bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); } bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } context.initializePipeline(ch, rpcHandler); } }); InetSocketAddress address = hostToBind == null ? new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); channelFuture = bootstrap.bind(address); channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port :" + port); }init()方法启动了Netty服务端,至此Master完成了Netty Server的启动,为了弄清楚Master如何使用Netty进行通信的继续看context.initializePipelist()方法
public TransportChannelHandler initializePipeline( SocketChannel channel, RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); channel.pipeline() .addLast("encoder", encoder) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", decoder) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); throw e; } }<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);"> </span>
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) { TransportResponseHandler responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections); }
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else { responseHandler.handle((ResponseMessage) request); } }从channelRead0方法可以看出Netty收到的数据经过Decode之后交由requestHandler处理,继续看handle()方法
public void handle(RequestMessage request) { if (request instanceof ChunkFetchRequest) { processFetchRequest((ChunkFetchRequest) request); } else if (request instanceof RpcRequest) { processRpcRequest((RpcRequest) request); } else if (request instanceof OneWayMessage) { processOneWayMessage((OneWayMessage) request); } else if (request instanceof StreamRequest) { processStreamRequest((StreamRequest) request); } else { throw new IllegalArgumentException("Unknown request type: " + request); } }对于RpcRquest
private void processRpcRequest(final RpcRequest req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { respond(new RpcResponse(req.requestId, new NioManagedBuffer(response))); } @Override public void onFailure(Throwable e) { respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } }); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e); respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } finally { req.body().release(); } }可以看到最终是由rpcHandler处理,RpcHandler在NettyRpcEvn初始化的时候被创建
private val transportContext = new TransportContext(transportConf, new NettyRpcHandler(dispatcher, this, streamManager))继续看NettyRpcHandler的receive()方法
override def receive( client: TransportClient, message: ByteBuffer, callback: RpcResponseCallback): Unit = { val messageToDispatch = internalReceive(client, message) dispatcher.postRemoteMessage(messageToDispatch, callback) }最终处理的是dispatcher,继续看dispatcher如何处理Message
private def postMessage( endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { val shouldCallOnStop = synchronized { val data = endpoints.get(endpointName) if (stopped || data == null) { true } else { data.inbox.post(message) receivers.offer(data) false } } if (shouldCallOnStop) { // We don't need to call `onStop` in the `synchronized` block val error = if (stopped) { new IllegalStateException("RpcEnv already stopped.") } else { new SparkException(s"Could not find $endpointName or it has been stopped.") } callbackIfStopped(error) } }关键代码:
data.inbox.post(message) receivers.offer(data)这里实际上是把消息放到一个队列里,并触发消息的处理,具体的过程后面在做详述。
到此可以梳理出一次Netty请求处理的调用流程:
TransportChannelHandler --> TransportRequestHandler --> RpcHandler [NettyRpcHandler] --> Dispatcher --> 消息队列
现在,继续回到Master的startRpcEnvAndEndpoint()方法
RpcEnv.create()方法执行完之后开始执行rpcEnv.setupEndpoint()方法
RpcEnv同样有两个子类AkkaRpcEnv和NettyRpcEnv,这里同样默认为NettyRpcEnv
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) }继续分析之前先看看distpatch这个成员变量是怎么来的
private val dispatcher: Dispatcher = new Dispatcher(this)继续看registerRpcEndpoint方法
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef }这里将RpcEndpoint(Master)对象put到endpoints中,具体如何使用,后面再分析
需要留意EndPointData的构造过程
private class EndpointData( val name: String, val endpoint: RpcEndpoint, val ref: NettyRpcEndpointRef) { val inbox = new Inbox(ref, endpoint) }这里inbox对象被创建,后面的分析会再次提到这个对象
注意这里的一个注释,// for the OnStart message, 也就是说在这一行代码调用之后Master的onStart()方法会被调用,具体如何调用的,我们下面来具体分析一下
private val receivers = new LinkedBlockingQueue[EndpointData]看到这一行代码,相信敏感的同学已经发现,这里是一个生产者消费者模式,下面我们继续看看谁消费了receivers队列里的EndpointData
private class MessageLoop extends Runnable { override def run(): Unit = { try { while (true) { try { val data = receivers.take() if (data == PoisonPill) { // Put PoisonPill back so that other MessageLoops can see it. receivers.offer(PoisonPill) return } data.inbox.process(Dispatcher.this) } catch { case NonFatal(e) => logError(e.getMessage, e) } } } catch { case ie: InterruptedException => // exit } } }这里就是消费者的代码,下面就要看看这个消费者究竟是什么时候启动的?
private val threadpool: ThreadPoolExecutor = { val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", Runtime.getRuntime.availableProcessors()) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool }原来是在成员变量theadpool赋值,也就是new Dispatcher()的过程完成的,Dispatcher则是NettyRpcEnv的成员变量,在NettyRpcEnv初始化的时候创建,NettyRpcEnv则在RpcEnv.create()方法执行时被创建
下面,继续分析消费者代码,消费者的最终处理逻辑由data.inbox.process()方法执行,下面我们看inbox的process()方法
def process(dispatcher: Dispatcher): Unit = { var message: InboxMessage = null inbox.synchronized { if (!enableConcurrent && numActiveThreads != 0) { return } message = messages.poll() if (message != null) { numActiveThreads += 1 } else { return } } while (true) { safelyCall(endpoint) { message match { case RpcMessage(_sender, content, context) => try { endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) } catch { case NonFatal(e) => context.sendFailure(e) // Throw the exception -- this exception will be caught by the safelyCall function. // The endpoint's onError function will be called. throw e } case OneWayMessage(_sender, content) => endpoint.receive.applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) case OnStart => endpoint.onStart() if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { inbox.synchronized { if (!stopped) { enableConcurrent = true } } } case OnStop => val activeThreads = inbox.synchronized { inbox.numActiveThreads } assert(activeThreads == 1, s"There should be only a single active thread but found $activeThreads threads.") dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() assert(isEmpty, "OnStop should be the last message") case RemoteProcessConnected(remoteAddress) => endpoint.onConnected(remoteAddress) case RemoteProcessDisconnected(remoteAddress) => endpoint.onDisconnected(remoteAddress) case RemoteProcessConnectionError(cause, remoteAddress) => endpoint.onNetworkError(cause, remoteAddress) } } inbox.synchronized { // "enableConcurrent" will be set to false after `onStop` is called, so we should check it // every time. if (!enableConcurrent && numActiveThreads != 1) { // If we are not the only one worker, exit numActiveThreads -= 1 return } message = messages.poll() if (message == null) { numActiveThreads -= 1 return } } } }这里我们确实看到了endpoint.onStart()方法的调用,下面看看OnStart对象何时被放进messages队列
inbox.synchronized { messages.add(OnStart) }在inbox初始化的时候执行了上述方法,现在我们弄明白了Master的onStart()方法是如何被调用的了,再看onStart()方法之前,我们继续解决前面遗留下来的问题:一次request最终如何交由Master处理
上面梳理出了这样的一个调用链:
TransportChannelHandler --> TransportRequestHandler --> RpcHandler [NettyRpcHandler] --> Dispatcher--> 消息队列
这个调用链实际上是一个消息的生产过程,将消息放入队列,并触发消费者处理消息
val data = endpoints.get(endpointName)
data.inbox.post(message) receivers.offer(data)最终会有Endpoint(也就是Master)来处理RequestMessage,一条消息从接受到被Master处理的过程如下:
TransportChannelHandler --> TransportRequestHandler --> RpcHandler [NettyRpcHandler] --> Dispatcher--> 消息队列 --> RpcEndpoint [Master]
2.2 分析Master的onStart()方法
下面来看Master的onStart()方法
override def onStart(): Unit = { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) } }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) if (restServerEnabled) { val port = conf.getInt("spark.master.rest.port", 6066) restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl)) } restServerBoundPort = restServer.map(_.start()) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() // Attach the master and app metrics servlet handler to the web ui after the metrics systems are // started. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val serializer = new JavaSerializer(conf) val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, serializer) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, serializer) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer]) .newInstance(conf, serializer) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ }onStart()方法除了启动了webUI, restServer, metrics等诸多服务外,还分别启动了对Worker TimeOut的Check和HA,Worker TimeOut Check相对简单没什么好说的,主要看一下HA部分,这部分代码共有三个分支,这里主要分析基于Zookeeper的HA
基于ZK的模式下Master将App,Worker,Driver序列化成ZK的节点,这样如果Active的Master宕机,Standby的Master可以通过从ZK节点上读取数据来保证状态信息不丢失
下面主要看下选主过程,以及HA的切换过程,这部分逻辑主要在ZooKeeperLeaderElectionAgent类中,在初始化过程中调用了start()方法
start() private def start() { logInfo("Starting ZooKeeper LeaderElection agent") zk = SparkCuratorUtil.newClient(conf) leaderLatch = new LeaderLatch(zk, WORKING_DIR) leaderLatch.addListener(this) leaderLatch.start() }首先,在ZK上创建一个序列化临时节点
client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));随后,获取所有的children,并对children进行排序,如果当前Master创建的节点排在第一位,则说明当前Master为Active,否则就是Watch优先级比当前节点更高的那个节点,如果那个节点发生了变化,则reset,再次尝试选主。
当Master的角色发生变化时会受到通知,Master的electedLeader()方法或者revokedLeadership()方法会被调用。需要说明的是,revokedLeadership()如果被调用,Master会主动选择宕机(在Master与Zookeeper之间失去联系的时候会发生这种情况)。
case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") System.exit(0) }
下面看看Master在成为Active之后会做那些事情
case ElectedLeader => { val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv) state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CompleteRecovery) } }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) } }从代码逻辑可以看出来,在Spark第一次启动时会处于ALIVE状态,其他时候则会处于RECOVERING状态
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], storedWorkers: Seq[WorkerInfo]) { for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { registerApplication(app) app.state = ApplicationState.UNKNOWN app.driver.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("App " + app.id + " had exception on reconnect") } } for (driver <- storedDrivers) { // Here we just read in the list of drivers. Any drivers associated with now-lost workers // will be re-launched when we detect that the worker is missing. drivers += driver } for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { registerWorker(worker) worker.state = WorkerState.UNKNOWN worker.endpoint.send(MasterChanged(self, masterWebUiUrl)) } catch { case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") } } }recovery的过程也相对简单,就是从Zookeeper上读取持久化信息,恢复到Master的内存中
上面是对Master启动过程的简单分析,如有错误欢迎各位同学拍砖。欢迎交流学习,QQ:1037727037
- 1楼zt346650571前天 10:35
- 能把Spark源码说的这么透的人实乃大神也,受教了,不错的学习资料。还是让我继续膜拜一下大神吧O(∩_∩)O哈哈~