Flink 源码(十九):组件通信(二)源码解读

4 RPC

Flink 源码(十九):组件通信(二)源码解读

RPC(本地/远程)调用,底层是通过 Akka 提供的 tell/ask 方法进行通信。
Flink 中 RPC 框架中涉及的主要类:
Flink 源码(十九):组件通信(二)源码解读
4.1 RpcGateway
  Flink 的 RPC 协议通过 RpcGateway 来定义,主要定义通信行为;用于远程调用RpcEndpoint 的某些方法,可以理解为对方的客服端代理。
  若想与远端 Actor 通信,则必须提供地址(ip 和 port),如在 Flink-on-Yarn 模式下,JobMaster 会先启动 ActorSystem,此时 TaskExecutor 的 Container 还未分配,后面与
TaskExecutor 通信时,必须让其提供对应地址。 
Flink 源码(十九):组件通信(二)源码解读
从类继承图可以看到基本上所有组件都实现了 RpcGateway 接口,其代码如下: 
Flink 源码(十九):组件通信(二)源码解读
4.2 RpcEndpoint
  RpcEndpoint 是通信终端,提供 RPC 服务组件的生命周期管理(start、stop)。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,
其实现了 RpcGateway 接口,其构造函数如下: 
Flink 源码(十九):组件通信(二)源码解读
  构造的时候调用rpcService.startServer()启动RpcServer,进入可以接收处理请求的状态,最后将 RpcServer 绑定到主线程上真正执行起来。
在 RpcEndpoint 中还定义了一些方法如 runAsync(Runnable)、callAsync(Callable, Time)方
  法来执行 Rpc 调用,值得注意的是在 Flink 的设计中,对于同一个 Endpoint,所有的调用都运行在主线程,因此不会有并发问题,当启动 RpcEndpoint/进行 Rpc 调用时,其会委托
RcpServer 进行处理。 
 
4.3 RpcService 和 RpcServer
  RpcService 和 RpcServer 是 RpcEndPoint 的成员变量。
  1)RpcService 是 Rpc 服务的接口,其主要作用如下:
  ⚫ 根据提供的 RpcEndpoint 来启动和停止 RpcServer(Actor);
  ⚫ 根据提供的地址连接到(对方的)RpcServer,并返回一个 RpcGateway;
  ⚫ 延迟/立刻调度 Runnable、Callable;
  在 Flink 中实现类为 AkkaRpcService,是 Akka 的 ActorSystem 的封装,基本可以理解成 ActorSystem 的一个适配器。在 ClusterEntrypoint(JobMaster)和 TaskManagerRunner
(TaskExecutor)启动的过程中初始化并启动。
  AkkaRpcService 中封装了ActorSystem,并保存了ActorRef 到 RpcEndpoint的映射关系。RpcService 跟 RpcGateway 类似,也提供了获取地址和端口的方法。
  在构造 RpcEndpoint 时会启动指定 rpcEndpoint 上的 RpcServer,其会根据 RpcEndpoint类型(FencedRpcEndpoint 或其他)来创建不同的 AkkaRpcActor(FencedAkkaRpcActor 或
AkkaRpcActor),并将RpcEndpoint和AkkaRpcActor对应的ActorRef保存起来,AkkaRpcActor是底层 Akka 调用的实际接收者,RPC 的请求在客户端被封装成 RpcInvocation 对象,以 Akka
消息的形式发送。
  最终使用动态代理将所有的消息转发到 InvocationHandler,具体代码如下:
Flink 源码(十九):组件通信(二)源码解读
2)RpcServer 负责接收响应远端 RPC 消息请求,自身的代理对象。有两个实现:
⚫ AkkaInvocationHandler
⚫ FencedAkkaInvocationHandler
RpcServer 的启动是通知底层的 AkkaRpcActor 切换为 START 状态,开始处理远程调用请求: 
Flink 源码(十九):组件通信(二)源码解读
4.4 AkkaRpcActor
AkkaRpcActor 是 Akka 的具体实现,主要负责处理如下类型消息:
1)本地 Rpc 调用 LocalRpcInvocation
  会指派给 RpcEndpoint 进行处理,如果有响应结果,则将响应结果返还给 Sender。
2)RunAsync & CallAsync
  这类消息带有可执行的代码,直接在 Actor 的线程中执行。
3)控制消息 ControlMessages
  用来控制 Actor 行为,START 启动,STOP 停止,停止后收到的消息会丢弃掉。 
4.5 RPC 交互过程
  RPC 通信过程分为请求和响应。 
4.5.1 RPC 请求发送
  在 RpcService 中调用 connect()方法与对端的 RpcEndpoint(RpcServer)建立连接,connect()方 法 根 据 给 的 地 址 返 回InvocationHandler(AkkaInvocationHandler或FencedAkkaInvocationHandler,也就是对方的代理)。前面分析到客户端提供代理对象,代理对象会调用 AkkaInvocationHandler 的 invoke 方
法并传入 RPC 调用的方法和参数信息,代码如下:
AkkaInvocationHandler.java
Flink 源码(十九):组件通信(二)源码解读
  代码中判断所属的类,如果是 RPC 方法,则调用 invokeRpc 方法。将方法调用封装为RPCInvocation 消息。如果是本地则生成 LocalRPCInvocation,本地消息不需要序列化,如果
是远程调用则创建 RemoteRPCInvocation。
  判断远程方法调用是否需要等待结果,如果无需等待(void),则使用向 Actor 发送 tell 类型的消息,如果需要返回结果,则向 Acrot 发送 ask 类型的消息,代码如下: 
AkkaInvocationHandler.java
Flink 源码(十九):组件通信(二)源码解读

 Flink 源码(十九):组件通信(二)源码解读

 Flink 源码(十九):组件通信(二)源码解读

4.5.2 RPC 请求响应
  RPC 消息通过 RpcEndpoint 所绑定的 Actor 的 ActorRef 发送的,AkkaRpcActor 是消息接收的入口,AkkaRpcActor 在 RpcEndpoint 中构造生成,负责将消息交给不同的方法进行处
理。
AkkaRpcActor.java 
Flink 源码(十九):组件通信(二)源码解读
接收的消息有 3 种:
1)握手消息
在客户端构造时会通过 ActorSelection 发送过来。收到消息后检查接口、版本是否匹配。
AkkaRpcActor.java
Flink 源码(十九):组件通信(二)源码解读

 Flink 源码(十九):组件通信(二)源码解读

2)控制消息
  在 RpcEndpoint 调用 start 方法后,会向自身发送一条 Processing.START 消息来转换当前 Actor 的状态为 STARTED,STOP 也类似,并且只有在 Actor 状态为 STARTED 时才
会处理 RPC 请求。
AkkaRpcActor.java
Flink 源码(十九):组件通信(二)源码解读
3)RPC 消息
通过解析 RpcInvocation 获取方法名和参数类型,并从 RpcEndpoint 类中找到 Method 对象,通过反射调用该方法。如果有返回结果,会以 Akka 消息的形式发送回发送者。
AkkaRpcActor.java
Flink 源码(十九):组件通信(二)源码解读

 Flink 源码(十九):组件通信(二)源码解读