七、pipeLine概述
pipeline的初始化
-
pipeline在创建Channel的时候被创建
在前面几节可以看到在服务端和客户端创建Channel的时候都会调用AbstractChannel构造方法创建Pipeline:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
进入newChannelPipeline方法,最终会调用DefaultChannelPipeline()方法:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在该方法中创建了tail和head,并指明两者关系为双向链表结构。
并将Channel和Pipeline绑定。
-
Pipeline节点数据结构:ChannelHandlerContext
ChannelHandlerContext继承自三个接口AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker。
AttributeMap中为一个Map结构,存放了一些属性。
ChannelInboundInvoker和ChannelOutboundInvoker定义了一些事件如传播读事件、绑定事件、异常事件等。 -
Pipeline中的两大哨兵:head和tail
在初始化ChannelPipeline时,我们看到在其中创建了HeadContext和TailContext。
TailContext实现了ChannelInboundHandler,说明是一个inbound处理器,再看一下构造函数:inbound标识为true
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME,inbound: true, outbound: false);
setAddComplete();
}
//本身是一个节点,包含的处理器也是自身
public ChannelHandler handler() {
return this;
}
HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler:但在构造函数中inbound标识为false,outbound标识为true
//unsafe主要是读写等相关的操作
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
//设置为Channel的unsafe
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
添加和删除ChannelHandler
添加ChannelHandler
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//流水线管理子通道中的Handler处理器
//向子通道流水线中添加一个Handler处理器
ch.pipeline().addLast(new SimpleNettyServerHandler());
}
});
跟进addlast方法,到达DefaultChannelPipeline.addlast():
- 判断是否重复添加checkMultiplicity(handler);
如果未添加并且不是可共享(利用反射获取注解Sharable 判断),则将added属性置为true - 创建节点并添加至链表
- 创建节点
newCtx = newContext(group, filterName(name, handler), handler);
filterName方法
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
//创建一个名字
return generateName(handler);
}
//检查是否重复
checkDuplicateName(name);
return name;
}
newContext方法返回DefaultChannelHandlerContext.
new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
- 添加节点
addLast0(newCtx);
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
- 回调完成添加事件
//在此处,若executor 为null则获取channel().eventLoop()赋值给executor
EventExecutor executor = newCtx.executor();
//判断是否为当前线程
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
//在当前线程则直接执行
callHandlerAdded0(newCtx);
在此处添加handler,
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
最终调用的为ChannelInitializer的handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
initChannel(ctx)回调至初始化添加代码,将自定义的Handler添加到Channel上。
最终移除ChannelHandlerContext
删除ChannelHandler
- 找到节点
getContextOrDie(handler);
---》
//遍历查找节点并返回
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
- 链表删除节点
assert ctx != head && ctx != tail;
//标准链表删除
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
- 回调删除Handler事件
//获取当前executor
EventExecutor executor = ctx.executor();
//判断是否在当前线程
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
callHandlerRemoved0(ctx);
----》回调到Handler的handlerRemoved方法
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
事件的传播
Context 包装 handler,多个 Context 在 pipeline 中形成了双向链表,入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始。而节点中间的传递通过 AbstractChannelHandlerContext 类内部的 fire 系列方法,找到当前节点的下一个节点不断的循环传播。
Handler涉及的环节包括:数据包解码、业务处理、目标数据编码、把数据写入到通道中,有出站和入站两种操作类型:
如下图:
- 入站处理,触发的方向为:自底向上,由Netty的内部(如通道)到ChannelInboundHandler入站处理器。
- 出站方向,触发的方向为:自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。
按照这种方向来分,前面数据包解码、业务处理这两个环节属于入站处理器的工作,后面目标数据编码、把数据包写到通道中属于出站处理器的工作。
ChannelInboundHandler通道入站处理器
当数据或者信息如占到Netty通道时,Netty将触发入站处理器ChannelInboundHandler对应的入站API,进行入站操作。
ChannelInboundHandler的主要操作如下:
inbound事件的传播
- 以ChannelRead事件为例
handler之间的传播信息通过fireXXX方法:其区别是从哪个节点开始传播。
- ctx.fireChannelRead(msg); 从当前节点往下传播事件
- ctx.channel().pipeline().fireChannelRead(msg);从头节点HeadContext开始传播
新建一个ChannelInboundHandler,如下:
public class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandlerA: "+msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().pipeline().fireChannelRead("hello world");
}
}
在ChannelActive处打上断点,进入:
从head开始传播:
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
....
//此处next为headContext,调用headContext的involveChannelRead方法:
next.invokeChannelRead(m);
....
((ChannelInboundHandler) handler()).channelRead(this, msg);
....
//没有做处理,继续向下传播
ctx.fireChannelRead(msg);
....
invokeChannelRead(findContextInbound(), msg);
看一下findContextInbound()方法,找到下一个Inbound的ctx
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
继续向下:
next.invokeChannelRead(m);
....
//此处调用的为InboundHandlerA 的ChannelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg)
继续向下,可以看到
此处findContextInbound返回的为TailContext
invokeChannelRead(findContextInbound(), msg);
....
//未处理信息的处理
onUnhandledInboundMessage(msg){
....
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
//释放资源
ReferenceCountUtil.release(msg);
}
....
}
Tips: SimpleInboundHandler会帮你自动释放资源
outbound事件的传播
当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler通道出站处理器,完成Netty通道到底层的操作。比方说,建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作,如下:
出站处理的方向是通过上层Netty通道去操作底层Java IO通道。
public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutboundHandlerA: " + msg);
ctx.write(msg, promise);
}
//模拟向客户端发送消息
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> {
//从头结点开始传播
ctx.channel().write("hello world");
//从当前节点开始往下传播ctx.write("hello world");
},3, TimeUnit.SECONDS);
}
}
在handlerAdded处打上断点,跟进:
pipeline.write(msg);
....
//调用tailcontext的write方法
tail.write(msg);
....
//newPromise包装Channel
write(msg, newPromise());
....
write(msg, false, promise);
....
AbstractChannelHandlerContext next = findContextOutbound(){
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
....
next.invokeWrite(m, promise);
调用到自定义的OutboundHandlerA的write方法。
当处理完后,继续ctx.write方法则会调用到HeadContext的unsafe.write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}