netty探索之旅3
netty探索之旅三
下面就开始我们的探索之旅
我下载的源码的版本是netty4.0。通过netty源码中的例子我们可以看到netty的是如何运行的,首先我们来看看客户端。
以下是源码的中的客户端的启动代码
路径是example\src\main\java\io\netty\example\echo\EchoClient
观察以上的代码,代码比较精炼。短短的几行代码就是netty客户端初始化所需的所有内容,这就是netty,它后面帮你做了很多事情,我们只需要简单的使用就行。
接下来我们深入的探索代码,看看到底做了什么事件。
第一句代码就很有看头:
EventLoopGroup group = new NioEventLoopGroup();
我们看看NioEventLoopGroup的构造函数一路都做了什么?
看了一路的super()的方法,其实最重要的就是MultithreadEventExecutorGroup类
1,创建一个nThreads大小的SingleThreadEventExecutor数组
2,根据数组的长度来创建chooser,如果nThreads是2的幂,则使用 PowerOfTwoEventExecutorChooser, 如果不是使用GenericEventExecutorChooser。它们的功能都是从children数组中选出一个合适的EventExecutor实例。
GenericEventExecutorChooser:
PowerOfTwoEventExecutorChooser:
3,调用newChild方法初始化children数组,由子类NioEventLoopGroup实现,创建的实际对象是NioEventLoop对象。
MultithreadEventExecutorGroup内部维护了一个事件执行(EventExecutor)的数组,当netty需要一个EventLoop对象时,会调用next()方法获取一个NioEventLoop对象,此对象就是newChild方法生成的。
NioEventLoop实例
包含属性:selectorProvider:selector的提供者,SelectorProvider.provider()获取
selector:具体的selector。provider.openSelector()获取一个selector对象
小总结一下:
1,NioEventLoopGroup内部维护一个类型为EventExecutor数组(变量:children),构造了一个线程池
2,调用newChild抽象方法来初始化children数组
3,抽象方法newChild在 NioEventLoopGroup 中实现的,返回一个NioEventLoop实例
接着看看NioSocketChannel:
关于NioSocketChannel我使用EA画了它的类关系图,比较多,看的不是很清楚。
[img]

[/img]
比较复杂。
在netty中,channel其实就是socket的抽象,是对socket状态和读写操作的封装。当netty每建立一个连接后,都会有一个对应的channel对象。
channel有不同的类型,分别对应着不同的协议和不同的阻塞类型,常见的有:
NioSocketChannel和NioServerSocketChannel;OioSocketChannel和OioServerSocketChannel等。我们在调用channel()方法,传入一个我们需要的channel的类型,(例子中的:channel(NioSocketChannel.class)),这个方法其实就是初始化了一个 BootstrapChannelFactory
BootstrapChannelFactory(AbstractBootstrap的内部类):实现了ChannelFactory接口,其中唯一的方法是newChannel(),这个就是典型生产channel的工厂类,看看BootstrapChannelFactory的newChannel()方法实现
小总结一下:
通过以上的代码我们可以确定:
1,AbstractBootstrap中的ChannelFactory的实现是BootstrapChannelFactory
2,channel具体类型由我们自己决定(NioSocketChannel.class)。channel实例化过程就是调用BootstrapChannelFactory的newChannel()方法来完成。
以上是创建一个channel实例,那么在哪里调用这个创建实例的方法喃?
接着看:
发现这句代码没?ChannelFuture f = b.connect(HOST, PORT).sync()。connect方法里面的其他的我们先不看看看我们现在此时关心的,进入connect看到了doConnect,接着进入看到了initAndRegister。哈哈哈发现了。在initAndRegister()中有句:channel = channelFactory().newChannel();
newChannel()这时会调用NioSocketChannel类的默认构造器
这里会调用newSocket方法来打开一个新的Java NIO SocketChannel
返回到this(newSocket(DEFAULT_SELECTOR_PROVIDER));中,这时会调用一系列的构造函数最终会调用到AbstractNioByteChannel中
parent为空
ch为刚刚newSocket方法创建的SocketChannel。
继续:AbstractNioChannel
继续:AbstractChannel
到这里就完成了Channel的初始化工作。
小总结一下:
1,NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO SocketChannel
2,AbstractChannel的初始化
parent:NULL
unsafe:newUnsafe()实例化一个unsafe对象,类型是AbstractNioByteChannel.NioByteUnsafe内部类
pipeline 是 new DefaultChannelPipeline(this) 新创建的实例:一个channel一个管道。
3,AbstractNioChannel的初始化
ch:socketchannel
readInterestOp:OP_READ
socketchannel设置为非阻塞
4,NioSocketChannel
config:NioSocketChannelConfig
来了来了,管道出现了,channel和Pipeline是在这里关联起来的。在实例化channel时,就会实例化一个ChannelPipeline,在上面的分析中,实例出来的对象其实是DefaultChannelPipeline。那么DefaultChannelPipeline的又在做什么工作喃。继续往下看:
channel把前面我们创建的channel(NioSocketChannel)带入到了管道中,看到tail和head熟悉数据结构的同学都晓得了,这个是一个双向链表的头和尾,没错在DefaultChannelPipeline维护了一个AbstractChannelHandlerContext为节点(TailContext和HeadContext都继承AbstractChannelHandlerContext)的双向链表。这个我们在后文单独来详细分析一下。
链表的头HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler两个接口。
按理说HeadContext只实现ChannelOutboundHandler,为什么也实现了ChannelInboundHandler还有待研究。
链表的尾TailContext实现了ChannelInboundHandler。
它们的父类AbstractChannelHandlerContext
HeadContext传入参数 inbound = false, outbound = true。
TailContext传入参数 inbound = true, outbound = false。
HeadContext是一个 outboundHandler,TailContext是一个inboundHandler。
还记得前面我们创建出来了channel吗?channel创建出来了,那么这个channel怎么使用喃?
所以我们继续回到AbstractBootstrap类的initAndRegister()方法
newChannel()方法创建出来channel后,下面就开始init()方法,此方法由Bootstrap实现类完成。
当channel初始化完成后,会继续调用group().register(channel)来注册channel。group()返回回来的就是NioEventLoopGroup。
接着走:NioEventLoopGroup的register(channel)方法,其实就是MultithreadEventLoopGroup的register
next()还记得MultithreadEventExecutorGroup中的Chooser不(从children数组中选出一个合适的NioEventLoop对象),
继续走:最终会调用SingleThreadEventLoop的register方法
最终我们发现是调用到了unsafe的register 方法,那么接下来我们就仔细看一下 AbstractUnsafe.register方法中到底做了什么:
我们主要看2句代码:AbstractChannel.this.eventLoop = eventLoop;把eventLoop(其实就是MultithreadEventLoopGroup.next()获取的NioEventLoop对象,参照本文写EventLoop的地方)对象赋值给channel中的eventLoop对象。然后是register0(promise);在AbstractChannel类中。
register0又调用了AbstractNioChannel.doRegister:
是不是看到了很熟悉的方法了。javaChannel()返回的前面初始化ch的参数,也就是SocketChannel,这里就是把channel注册到NioEventLoop对象生成的selector上(这里就是传统看的selector注册channel的方法)。这样我们将这个 SocketChannel注册到与eventLoop关联的selector上了。
小总结一下:
以上是channel的注册过程,在netty中每个channel都会关联一个EventLoop,EventLoop负责执行channel中的所有IO操作。关联好Channel和EventLoop后,调用SocketChannel的register方法,将SocketChannel注册到selector中。
接着我们继续看EchoClient的这句:
一个handler的概念就孕育而生了。netty中的handler是添加到pipeline中的,至于pipeline的实现机制后续专门拿出来分析(这篇文章的前面只是分析了一下pipeline的初始化)。
handler()方法参数是ChannelHandler对象,上面代码的参数是ChannelInitializer实现了ChannelHandler接口,并Override了initChannel方法,把我们自定义的handler加入到ChannelPipeline中。在ChannelInitializer的channelRegistered方法中会调用initChannel方法:
小总结一下:
这里只是想说明一下handler是怎么添加到ChannelPipeline中的,至于ChannelPipeline的底层机制,后面慢讲。
最后我们来分析一下客户端连接,这篇文章就要大功告成了!
来吧这句代码:ChannelFuture f = b.connect(HOST, PORT).sync();
客户端通过调用Bootstrap的connect方法进行连接,经过追踪:
此方法中,会在eventloop线程中调用channel的connect方法, 而这个channel NioSocketChannel(在前面的channel小结讨论过)。继续看其实是调用DefaultChannelPipeline的connect方法。
ChannelPipeline中的tail字段,前面我也提到了,tail是TailContext实例,也是 AbstractChannelHandlerContext的子类,这个也是调用的AbstractChannelHandlerContext的connect方法。
先来看看findContextOutbound():从DefaultChannelPipeline内的双向链表的tail开始, 不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的 invokeConnect方法。
按照前面讲pipeline的地方,双向链表的头和尾.head是HeadContext的实例,实现了ChannelOutboundHandler接口,并且它的outbound字段为true. 因此在 findContextOutbound中,找到的AbstractChannelHandlerContext对象其实就是链表的head.HeadContext从写了connect方法。
调用了unsafe的connect。只有继续了哦。unsafe是在HeadContext构造器中pipeline.channel().unsafe()返回的,就是AbstractNioByteChannel.NioByteUnsafe内部类。
就是调用AbstractNioUnsafe中的connect方法,
doConnect方法其实又是调用到了NioSocketChannel中去了
又一次出现了javaChannel()就是socketChannel,
SocketUtils.connect:
妈呀终于看到了socketChannel.connect(remoteAddress);终于完成了socket连接。
以上就是客户端的初始化和连接了,好绕好绕!
再总结一下各个组件的关系:
NioEventLoop包含
selectorProvider:selector的提供者,SelectorProvider.provider()获取
selector:具体的selector。provider.openSelector()获取一个selector对象
SocketChannel实例会被注册到此selector上
NioEventLoopGroup包含
children数组,存放NioEventLoop实例
NioSocketChannel包含
SocketChannel实例
DefaultChannelPipeline实例
unsafe实例
NioEventLoop实例
在实例化channel时,就会实例化一个ChannelPipeline
DefaultChannelPipeline包含
NioSocketChannel实例
TailContext实例(AbstractChannelHandlerContext)
HeadContext实例(AbstractChannelHandlerContext)
自定义的handler
AbstractChannelHandlerContext包含
DefaultChannelPipeline实例
大概梳理了一下,以后发现要加再加!
下面就开始我们的探索之旅
我下载的源码的版本是netty4.0。通过netty源码中的例子我们可以看到netty的是如何运行的,首先我们来看看客户端。
以下是源码的中的客户端的启动代码
路径是example\src\main\java\io\netty\example\echo\EchoClient
final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); }
观察以上的代码,代码比较精炼。短短的几行代码就是netty客户端初始化所需的所有内容,这就是netty,它后面帮你做了很多事情,我们只需要简单的使用就行。
接下来我们深入的探索代码,看看到底做了什么事件。
第一句代码就很有看头:
EventLoopGroup group = new NioEventLoopGroup();
我们看看NioEventLoopGroup的构造函数一路都做了什么?
看了一路的super()的方法,其实最重要的就是MultithreadEventExecutorGroup类
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { children[i] = newChild(threadFactory, args); } }
1,创建一个nThreads大小的SingleThreadEventExecutor数组
2,根据数组的长度来创建chooser,如果nThreads是2的幂,则使用 PowerOfTwoEventExecutorChooser, 如果不是使用GenericEventExecutorChooser。它们的功能都是从children数组中选出一个合适的EventExecutor实例。
GenericEventExecutorChooser:
public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; }
PowerOfTwoEventExecutorChooser:
public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; }
3,调用newChild方法初始化children数组,由子类NioEventLoopGroup实现,创建的实际对象是NioEventLoop对象。
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
MultithreadEventExecutorGroup内部维护了一个事件执行(EventExecutor)的数组,当netty需要一个EventLoop对象时,会调用next()方法获取一个NioEventLoop对象,此对象就是newChild方法生成的。
NioEventLoop实例
包含属性:selectorProvider:selector的提供者,SelectorProvider.provider()获取
selector:具体的selector。provider.openSelector()获取一个selector对象
小总结一下:
1,NioEventLoopGroup内部维护一个类型为EventExecutor数组(变量:children),构造了一个线程池
2,调用newChild抽象方法来初始化children数组
3,抽象方法newChild在 NioEventLoopGroup 中实现的,返回一个NioEventLoop实例
接着看看NioSocketChannel:
关于NioSocketChannel我使用EA画了它的类关系图,比较多,看的不是很清楚。
[img]
[/img]
比较复杂。
在netty中,channel其实就是socket的抽象,是对socket状态和读写操作的封装。当netty每建立一个连接后,都会有一个对应的channel对象。
channel有不同的类型,分别对应着不同的协议和不同的阻塞类型,常见的有:
NioSocketChannel和NioServerSocketChannel;OioSocketChannel和OioServerSocketChannel等。我们在调用channel()方法,传入一个我们需要的channel的类型,(例子中的:channel(NioSocketChannel.class)),这个方法其实就是初始化了一个 BootstrapChannelFactory
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new BootstrapChannelFactory<C>(channelClass)); }
BootstrapChannelFactory(AbstractBootstrap的内部类):实现了ChannelFactory接口,其中唯一的方法是newChannel(),这个就是典型生产channel的工厂类,看看BootstrapChannelFactory的newChannel()方法实现
public T newChannel() { try { return clazz.newInstance();//创建一个channel的实例,比如:NioSocketChannel } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
小总结一下:
通过以上的代码我们可以确定:
1,AbstractBootstrap中的ChannelFactory的实现是BootstrapChannelFactory
2,channel具体类型由我们自己决定(NioSocketChannel.class)。channel实例化过程就是调用BootstrapChannelFactory的newChannel()方法来完成。
以上是创建一个channel实例,那么在哪里调用这个创建实例的方法喃?
接着看:
发现这句代码没?ChannelFuture f = b.connect(HOST, PORT).sync()。connect方法里面的其他的我们先不看看看我们现在此时关心的,进入connect看到了doConnect,接着进入看到了initAndRegister。哈哈哈发现了。在initAndRegister()中有句:channel = channelFactory().newChannel();
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory().newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); } return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
newChannel()这时会调用NioSocketChannel类的默认构造器
public NioSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
这里会调用newSocket方法来打开一个新的Java NIO SocketChannel
private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } }
返回到this(newSocket(DEFAULT_SELECTOR_PROVIDER));中,这时会调用一系列的构造函数最终会调用到AbstractNioByteChannel中
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
parent为空
ch为刚刚newSocket方法创建的SocketChannel。
继续:AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
继续:AbstractChannel
protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
到这里就完成了Channel的初始化工作。
小总结一下:
1,NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO SocketChannel
2,AbstractChannel的初始化
parent:NULL
unsafe:newUnsafe()实例化一个unsafe对象,类型是AbstractNioByteChannel.NioByteUnsafe内部类
pipeline 是 new DefaultChannelPipeline(this) 新创建的实例:一个channel一个管道。
3,AbstractNioChannel的初始化
ch:socketchannel
readInterestOp:OP_READ
socketchannel设置为非阻塞
4,NioSocketChannel
config:NioSocketChannelConfig
来了来了,管道出现了,channel和Pipeline是在这里关联起来的。在实例化channel时,就会实例化一个ChannelPipeline,在上面的分析中,实例出来的对象其实是DefaultChannelPipeline。那么DefaultChannelPipeline的又在做什么工作喃。继续往下看:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
channel把前面我们创建的channel(NioSocketChannel)带入到了管道中,看到tail和head熟悉数据结构的同学都晓得了,这个是一个双向链表的头和尾,没错在DefaultChannelPipeline维护了一个AbstractChannelHandlerContext为节点(TailContext和HeadContext都继承AbstractChannelHandlerContext)的双向链表。这个我们在后文单独来详细分析一下。
链表的头HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler两个接口。
按理说HeadContext只实现ChannelOutboundHandler,为什么也实现了ChannelInboundHandler还有待研究。
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
链表的尾TailContext实现了ChannelInboundHandler。
TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); }
它们的父类AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; }
HeadContext传入参数 inbound = false, outbound = true。
TailContext传入参数 inbound = true, outbound = false。
HeadContext是一个 outboundHandler,TailContext是一个inboundHandler。
还记得前面我们创建出来了channel吗?channel创建出来了,那么这个channel怎么使用喃?
所以我们继续回到AbstractBootstrap类的initAndRegister()方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory().newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); } return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
newChannel()方法创建出来channel后,下面就开始init()方法,此方法由Bootstrap实现类完成。
void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(handler()); final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
当channel初始化完成后,会继续调用group().register(channel)来注册channel。group()返回回来的就是NioEventLoopGroup。
接着走:NioEventLoopGroup的register(channel)方法,其实就是MultithreadEventLoopGroup的register
public ChannelFuture register(Channel channel) { return next().register(channel); }
next()还记得MultithreadEventExecutorGroup中的Chooser不(从children数组中选出一个合适的NioEventLoop对象),
继续走:最终会调用SingleThreadEventLoop的register方法
public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
最终我们发现是调用到了unsafe的register 方法,那么接下来我们就仔细看一下 AbstractUnsafe.register方法中到底做了什么:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
我们主要看2句代码:AbstractChannel.this.eventLoop = eventLoop;把eventLoop(其实就是MultithreadEventLoopGroup.next()获取的NioEventLoop对象,参照本文写EventLoop的地方)对象赋值给channel中的eventLoop对象。然后是register0(promise);在AbstractChannel类中。
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
register0又调用了AbstractNioChannel.doRegister:
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
是不是看到了很熟悉的方法了。javaChannel()返回的前面初始化ch的参数,也就是SocketChannel,这里就是把channel注册到NioEventLoop对象生成的selector上(这里就是传统看的selector注册channel的方法)。这样我们将这个 SocketChannel注册到与eventLoop关联的selector上了。
小总结一下:
以上是channel的注册过程,在netty中每个channel都会关联一个EventLoop,EventLoop负责执行channel中的所有IO操作。关联好Channel和EventLoop后,调用SocketChannel的register方法,将SocketChannel注册到selector中。
接着我们继续看EchoClient的这句:
.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } });
一个handler的概念就孕育而生了。netty中的handler是添加到pipeline中的,至于pipeline的实现机制后续专门拿出来分析(这篇文章的前面只是分析了一下pipeline的初始化)。
handler()方法参数是ChannelHandler对象,上面代码的参数是ChannelInitializer实现了ChannelHandler接口,并Override了initChannel方法,把我们自定义的handler加入到ChannelPipeline中。在ChannelInitializer的channelRegistered方法中会调用initChannel方法:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } }
小总结一下:
这里只是想说明一下handler是怎么添加到ChannelPipeline中的,至于ChannelPipeline的底层机制,后面慢讲。
最后我们来分析一下客户端连接,这篇文章就要大功告成了!
来吧这句代码:ChannelFuture f = b.connect(HOST, PORT).sync();
客户端通过调用Bootstrap的connect方法进行连接,经过追踪:
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { channel.connect(remoteAddress, promise); } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
此方法中,会在eventloop线程中调用channel的connect方法, 而这个channel NioSocketChannel(在前面的channel小结讨论过)。继续看其实是调用DefaultChannelPipeline的connect方法。
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return tail.connect(remoteAddress, localAddress); }
ChannelPipeline中的tail字段,前面我也提到了,tail是TailContext实例,也是 AbstractChannelHandlerContext的子类,这个也是调用的AbstractChannelHandlerContext的connect方法。
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (!validatePromise(promise, false)) { return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
先来看看findContextOutbound():从DefaultChannelPipeline内的双向链表的tail开始, 不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的 invokeConnect方法。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } }
按照前面讲pipeline的地方,双向链表的头和尾.head是HeadContext的实例,实现了ChannelOutboundHandler接口,并且它的outbound字段为true. 因此在 findContextOutbound中,找到的AbstractChannelHandlerContext对象其实就是链表的head.HeadContext从写了connect方法。
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); }
调用了unsafe的connect。只有继续了哦。unsafe是在HeadContext构造器中pipeline.channel().unsafe()返回的,就是AbstractNioByteChannel.NioByteUnsafe内部类。
protected class NioByteUnsafe extends AbstractNioUnsafe
就是调用AbstractNioUnsafe中的connect方法,
........ if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } .......
doConnect方法其实又是调用到了NioSocketChannel中去了
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
又一次出现了javaChannel()就是socketChannel,
SocketUtils.connect:
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
妈呀终于看到了socketChannel.connect(remoteAddress);终于完成了socket连接。
以上就是客户端的初始化和连接了,好绕好绕!
再总结一下各个组件的关系:
NioEventLoop包含
selectorProvider:selector的提供者,SelectorProvider.provider()获取
selector:具体的selector。provider.openSelector()获取一个selector对象
SocketChannel实例会被注册到此selector上
NioEventLoopGroup包含
children数组,存放NioEventLoop实例
NioSocketChannel包含
SocketChannel实例
DefaultChannelPipeline实例
unsafe实例
NioEventLoop实例
在实例化channel时,就会实例化一个ChannelPipeline
DefaultChannelPipeline包含
NioSocketChannel实例
TailContext实例(AbstractChannelHandlerContext)
HeadContext实例(AbstractChannelHandlerContext)
自定义的handler
AbstractChannelHandlerContext包含
DefaultChannelPipeline实例
大概梳理了一下,以后发现要加再加!