netty 通路处理器上下文
netty 通道处理器上下文
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
引言:
前面一篇文章我们主要看了一下通道处理器上下文接口的定义,先来回顾一下:
通道处理器上下文ChannelHandlerContext,使通道处理器可以与管道和管道中其他处理器进行交互。当IO事件发生时,处理可以将事件转发给所属管道的下一个通道处理器,同时可以动态修改处理器所属的管道。通过上下文可以获取关联通道,处理器,事件执行器,上下文名,所属管道等信息。同时可以通过AttributeKey存储上下文属性,用alloc方法获取通道上下文的字节buf分配器,用于分配buf。
今天我们来看上下文的抽象实现。
从上面可以看出抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。
下面我们来看Inbound事件处理,
先来看上下文处理通道的channelRegistered事件
上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
2.
再来看异常处理
//StackTraceElement
再来看,触发通道处理器异常处理方法exceptionCaught
从上面可以看出,上下文处理通道fireChannelRegistered事件,
如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;
触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,
只不过触发的是通道处理器的相应事件;
前面的文章已讲,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看Outbound地址绑定事件的处理:
再来看一下寻找Outbound处理器:
最后来看一下异常处理
//PromiseNotificationUtil
从上面可以看出上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务
失败,并log异常日志。
上下文,处理其他Outbound事件的思路,基本相同,
前面文章已将,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看其他方法,很简单,不用说了:
再来通道处理器上下文的默认实现:
通道处理器上下文默认实现DefaultChannelHandlerContext内部关联一个通道处理器。
总结:
抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。上下文关联的通道通道处理器在具体的实现中定义,比如通道处理器上下文默认实现为DefaultChannelHandlerContext,内部关联一个通道处理器。
上下文处理通道fireChannelRegistered事件,如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发
上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的是通道处理器的相应事件;如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。
上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,
寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务失败,并log异常日志。
netty的通道处理器上下文和mina的会话有点像,都拥有描述通道Handler的Context;
不同的时mina中的会话与通道直接关联,而netty上下文与通道间是通过Channel管道关联起来,mina中的过滤链是依附于会话,而netty上下文依附于Channel管道,mina中的IO事件执行器为IoProcessor,netty中的IO事件的处理委托给事件循环
或上下文的子事件执行器。
附:
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道处理器上下文定义:http://donald-draper.iteye.com/blog/2389214
引言:
前面一篇文章我们主要看了一下通道处理器上下文接口的定义,先来回顾一下:
通道处理器上下文ChannelHandlerContext,使通道处理器可以与管道和管道中其他处理器进行交互。当IO事件发生时,处理可以将事件转发给所属管道的下一个通道处理器,同时可以动态修改处理器所属的管道。通过上下文可以获取关联通道,处理器,事件执行器,上下文名,所属管道等信息。同时可以通过AttributeKey存储上下文属性,用alloc方法获取通道上下文的字节buf分配器,用于分配buf。
今天我们来看上下文的抽象实现。
import io.netty.buffer.ByteBufAllocator; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.internal.PromiseNotificationUtil; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); volatile AbstractChannelHandlerContext next;//上下文后继 volatile AbstractChannelHandlerContext prev;//上下文前驱 //上下文状态 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState"); /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called. 通道处理器handlerAdded事件将要触发 */ private static final int ADD_PENDING = 1; /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. 通道处理器handlerAdded事件已经触发 */ private static final int ADD_COMPLETE = 2; /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. 通道处理器上下文已经从所属管道移除 */ private static final int REMOVE_COMPLETE = 3; /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. 上下文初始化状态 */ private static final int INIT = 0; private final boolean inbound;//Inbound处理器上下文标志 private final boolean outbound;//Outbound处理器上下文标志 private final DefaultChannelPipeline pipeline;//上下文所属管道 private final String name;//上下文名,对应通道处理器添加时的name private final boolean ordered;//事件执行器是否为顺序执行器 // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. //如果没有子事件执行器可用,为空,否则为子事件执行器 final EventExecutor executor;//事件执行器 private ChannelFuture succeededFuture;//通道异步任务结果 // Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. //延时创建任务用于在不同的执行器中,处理触发事件,这些任务需要为volatile,最糟糕的情况下,仅仅创建比实际需要,多一点的任务 private Runnable invokeChannelReadCompleteTask;//读完成任务线程 private Runnable invokeReadTask;//上下文读任务线程 private Runnable invokeChannelWritableStateChangedTask;//通道可写状态改变任务线程 private Runnable invokeFlushTask;//上下文刷新任务线程 private volatile int handlerState = INIT; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { //检查上下为name是否为空 this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. //是否为Ordered事件执行器 ordered = executor == null || executor instanceof OrderedEventExecutor; } }
从上面可以看出抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。
下面我们来看Inbound事件处理,
先来看上下文处理通道的channelRegistered事件
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor();//获取上下文事件执行器 //如果事件执行器在当前事务循环,则直接调用上下文invokeChannelRegistered方法 if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { //否则创建一个线程执行上下文invokeChannelRegistered方法,并有上下文事务执行器运行 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } //触发通道channelRegistered事件 private void invokeChannelRegistered() { //如果通道处理器已添加到管道 if (invokeHandler()) { try { //触发通道处理器的channelRegistered事件 ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { //通知异常 notifyHandlerException(t); } } else { //转发事件消息 fireChannelRegistered(); } }
上述方法有一下几点要看
1.
//判断通道处理器已添加到管道
/** * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called * yet. If not return {@code false} and if called or could not detect return {@code true}. *确保通道处理器的handlerAdded方法已触发。 * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}. 如果失败,则不会调用通道处理器的相关事件处理方法,而是转发事件。这种情况主要针对通道处理器已经添加到管道, 但通道处理器handlerAdded方法没有被调用的情况,即通道处理器关联的上下文已经添加管道上下文链,但并没有更新上下文状态 和触发通道处理器的handlerAdded方法。 */ private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
2.
//转发事件消息 Override public ChannelHandlerContext fireChannelRegistered() { //转发事件给上下文所属管道的下一个上下文 invokeChannelRegistered(findContextInbound()); return this; } //获取上下文所属管道的下一个Inbound上下文 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
再来看异常处理
//通知异常 notifyHandlerException(t);
private void notifyHandlerException(Throwable cause) { //判断异常堆栈信息中是否存在exceptionCaught方法信息 if (inExceptionCaught(cause)) { //如果是通道处理器exceptionCaught方法抛出的异常,则直接log if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event", cause); } return; } //否则,触发通道处理器异常处理方法exceptionCaught invokeExceptionCaught(cause); }
//判断异常堆栈信息中是否存在exceptionCaught方法信息 private static boolean inExceptionCaught(Throwable cause) { do { //获取异常堆栈frame信息 StackTraceElement[] trace = cause.getStackTrace(); if (trace != null) { for (StackTraceElement t : trace) { if (t == null) { break; } //是否是exceptionCaught方法抛出的异常,是则返回true if ("exceptionCaught".equals(t.getMethodName())) { return true; } } } cause = cause.getCause(); } while (cause != null); return false; }
//StackTraceElement
package java.lang; import java.util.Objects; /** * An element in a stack trace, as returned by {@link * Throwable#getStackTrace()}. Each element represents a single stack frame. * All stack frames except for the one at the top of the stack represent * a method invocation. The frame at the top of the stack represents the * execution point at which the stack trace was generated. Typically, * this is the point at which the throwable corresponding to the stack trace * was created. 用于描述异常堆栈的frame * * @since 1.4 * @author Josh Bloch */ public final class StackTraceElement implements java.io.Serializable { // Normally initialized by VM (public constructor added in 1.5) private String declaringClass;//异常类 private String methodName;//异常发生点方法 private String fileName;//异常发生的文件名 private int lineNumber;//异常发生的行号 ... }
再来看,触发通道处理器异常处理方法exceptionCaught
private void invokeExceptionCaught(final Throwable cause) { //如果通道处理器已添加到管道 if (invokeHandler()) { try { //触发通道处理器exceptionCaught事件 handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", error, cause); } } } else { //否则转递IO异常事件给管道中的下一个Inbound处理器 fireExceptionCaught(cause); } } @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { invokeExceptionCaught(next, cause); return this; }
从上面可以看出,上下文处理通道fireChannelRegistered事件,
如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;
触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,
只不过触发的是通道处理器的相应事件;
前面的文章已讲,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看Outbound地址绑定事件的处理:
@Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newPromise()); }
//创建通道任务DefaultChannelPromise /** * The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create * a new {@link ChannelPromise} rather than calling the constructor explicitly. */ public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { private final Channel channel; private long checkpoint; ... /** * Creates a new instance. * * @param channel * the {@link Channel} associated with this future */ public DefaultChannelPromise(Channel channel, EventExecutor executor) { super(executor); this.channel = channel; } }
//绑定socket地址 @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { //非可写通道任务,直接返回 // cancelled return promise; } //从当前上下文开始(尾部),向前找到第一个Outbound上下文,处理地址绑定事件 final AbstractChannelHandlerContext next = findContextOutbound(); //获取上下为事件执行器 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { //如果事件执行器线程在事件循环中,则直接委托给invokeBind next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } //触发通道处理器地址绑定事件 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) {//如果通道处理器已经添加到管道中 try { //触发Outbound通道处理器的bind事件方法 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { //通知异常 notifyOutboundHandlerException(t, promise); } } else { //否则传递绑定事件给管道中的下一个Outbound上下文 bind(localAddress, promise); } } private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { //执行事件失败 promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } }
再来看一下寻找Outbound处理器:
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
最后来看一下异常处理
//通知异常 notifyOutboundHandlerException(t, promise);
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return // false. //直接委托给异步任务结果通知工具PromiseNotificationUtil PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger); }
//PromiseNotificationUtil
package io.netty.util.internal; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; /** * Internal utilities to notify {@link Promise}s. 内部异步任务结果通知工具 */ public final class PromiseNotificationUtil { private PromiseNotificationUtil() { } /** * Try to cancel the {@link Promise} and log if {@code logger} is not {@code null} in case this fails. 通知异步任务取消 */ public static void tryCancel(Promise<?> p, InternalLogger logger) { if (!p.cancel(false) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to cancel promise because it has succeeded already: {}", p); } else { logger.warn( "Failed to cancel promise because it has failed already: {}, unnotified cause:", p, err); } } } /** * Try to mark the {@link Promise} as success and log if {@code logger} is not {@code null} in case this fails. 通知异步任务成功 */ public static <V> void trySuccess(Promise<? super V> p, V result, InternalLogger logger) { if (!p.trySuccess(result) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as success because it has succeeded already: {}", p); } else { logger.warn( "Failed to mark a promise as success because it has failed already: {}, unnotified cause:", p, err); } } } /** * Try to mark the {@link Promise} as failure and log if {@code logger} is not {@code null} in case this fails. 通知异步任务取消失败 */ public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) { if (!p.tryFailure(cause) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause); } else { logger.warn( "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", p, ThrowableUtil.stackTraceToString(err), cause); } } } }
从上面可以看出上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务
失败,并log异常日志。
上下文,处理其他Outbound事件的思路,基本相同,
前面文章已将,可以参考
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
我们这里不再赘述。
再来看其他方法,很简单,不用说了:
@Override public Channel channel() { return pipeline.channel(); } @Override public ChannelPipeline pipeline() { return pipeline; } @Override public ByteBufAllocator alloc() { return channel().config().getAllocator(); } @Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } } @Override public String name() { return name; } @Override public boolean isRemoved() { return handlerState == REMOVE_COMPLETE; } @Override public <T> Attribute<T> attr(AttributeKey<T> key) { return channel().attr(key); } @Override public <T> boolean hasAttr(AttributeKey<T> key) { return channel().hasAttr(key); } //返回可读的资源泄漏信息,用于追踪资源泄漏情况 @Override public String toHintString() { return '\'' + name + "' will handle the message from this point."; } @Override public String toString() { return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')'; }
再来通道处理器上下文的默认实现:
//通道处理器上下文默认实现DefaultChannelHandlerContext: package io.netty.channel; import io.netty.util.concurrent.EventExecutor; final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler;//关联通道处理器 DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
通道处理器上下文默认实现DefaultChannelHandlerContext内部关联一个通道处理器。
总结:
抽象通道处理器上下文AbstractChannelHandlerContext,拥有一个前驱和后继上下文,用于在管道中传递IO事件;通道处理器总共有四个状态,分别为初始化,正在添加到管道,已添加管道和从管道移除状态;上下文同时关联一个管道;Inbound和Outbound两个用于判断上下文的类型,决定了上下文是处理器Inbound事件还是Outbound事件;一个事件执行器executor,当上下文执行器不在当前事务循环中时,用于执行IO事件操作;同时有一些延时任务,如上下文读任务,上下文刷新任务,读完成任务和通道可写状态改变任务。上下文构造,主要是初始化上下文name,所属管道,事件执行器,上下文类型。上下文关联的通道通道处理器在具体的实现中定义,比如通道处理器上下文默认实现为DefaultChannelHandlerContext,内部关联一个通道处理器。
上下文处理通道fireChannelRegistered事件,如果上下文事件执行器在当前事务循环,则直接在当前线程,执行触发上下文关联通道处理器的channelRegistered事件任务,否则,创建一个线程执行事件任务,并由上下文事务执行器运行;触发上下文关联通道处理器的channelRegistered事件任务,首先判断上下文是否已经添加到管道,已添加,则触发
上下文关联通道处理器的channelRegistered事件,否则转发事件给上下文所属管道的下一个Inbound上下文。其他Inbound事件的处理过程与fireChannelRegistered方法思路相同,只不过触发的是通道处理器的相应事件;如果Inbound事件处理过程中,异常发生,首先检查异常是不是通道处理器的exceptionCaught方法抛出,是,则直接log,否则触发上下文关联通道处理器的exceptionCaught事件。
上下文处理关联通道处理器的地址绑定bind事件,首先从所属管道上下文链的尾部开始,
寻找Outbound上下文,找到后,获取上下文的事件执行器,如果事件执行器线程在当前事件循环中,则触发上下文关联通道处理器地址绑定事件,否则创建一个线程,执行事件触发操作,并交由事件执行器执行;触发上下文关联通道处理器地址绑定事件,首先判断上下文关联通道处理器是否已经添加到管道,如果以添加,则触发Outbound通道处理器的bind事件方法,否则,传递地址绑定事件给管道中的下一个Outbound上下文。如果在Outbound事件处理器过程中,出现异常则直接委托给异步任务结果通知工具PromiseNotificationUtil,通知异步任务失败,并log异常日志。
netty的通道处理器上下文和mina的会话有点像,都拥有描述通道Handler的Context;
不同的时mina中的会话与通道直接关联,而netty上下文与通道间是通过Channel管道关联起来,mina中的过滤链是依附于会话,而netty上下文依附于Channel管道,mina中的IO事件执行器为IoProcessor,netty中的IO事件的处理委托给事件循环
或上下文的子事件执行器。
附:
/** * A hint object that provides human-readable message for easier resource leak tracking. */ public interface ResourceLeakHint { /** * Returns a human-readable message that potentially enables easier resource leak tracking. */ String toHintString(); }