ctx.close() 和 ctx.channel().close() 到底有何区别? 从现象来看 从源码来说

我最近在项目中,遇到一个问题,ctx.close() 和 ctx.channel().close() 到底有何区别?

即调用 ChannelHandlerContext#close() 和 Channel#close() 有何不同?

建议先看一下下面这篇文章:

[翻译]Netty4中 Ctx.close 与 Ctx.channel.close 的区别 跳转 click here

假如我们有这样一个 双向处理器 SomeHandler

import io.netty.channel.*;

@ChannelHandler.Sharable
public class SomeHandler extends ChannelDuplexHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.fireChannelActive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(ctx.name() + " channelRead: " + msg);
        String result = (String) msg;
        if (("ctx.close." + ctx.name()).equals(result)) {
            ctx.close();
        } else if (("ctx.channel.close." + ctx.name()).equals(result)) {
            ctx.channel().close();
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        ctx.fireChannelInactive();
    }

    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println(ctx.name() + " close");
        ctx.close(promise);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println(ctx.name() + " write");
        ctx.write(String.format("[%s]%s", ctx.name(), msg), promise);
    }
}

假如,我们的服务器端构建的管道:

ChannelPipeline p = ...;
p.addLast("A", new SomeHandler());
p.addLast("B", new SomeHandler());
p.addLast("C", new SomeHandler());
...
完整的 服务端代码 点击此处

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        StringDecoder stringDecoder = new StringDecoder();
        StringEncoder stringEncoder = new StringEncoder();
        SomeHandler aHandler = new SomeHandler();
        SomeHandler bHandler = new SomeHandler();
        SomeHandler cHandler = new SomeHandler();
        ServerBootstrap bootstrap = new ServerBootstrap()
                .channel(NioServerSocketChannel.class)
                .group(boss, worker)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) {
                        channel.pipeline()
                                .addLast("decoder", stringDecoder)
                                .addLast("encoder", stringEncoder)
                                .addLast("A", aHandler)
                                .addLast("B", bHandler)
                                .addLast("C", cHandler);
                    }
                });
        bootstrap.bind(8098).sync();
    }
}

如果,客户端 发送给服务端的数据是 ctx.close.B,输出日志将是:

A channelRead: ctx.close.B
B channelRead: ctx.close.B
A close

这里,你没有看到输出 B close ,但是不用惊讶,因为,你调用的是上下文 ctx.close() 方法,它是不会再去调用当前处理器对象的 close 方法的。

如果,客户端 发送给服务端的数据是 channel.close.B,输出的日志将是:

A channelRead: ctx.channel.close.B
B channelRead: ctx.channel.close.B
C close
B close
A close

从源码来说

ctx.close()

通常,ctx 的类是 DefaultChannelHandlerContext,在调用 close 或者 writeAndFlush 这类出站方法时,最终会调用 AbstractChannelHandlerContext 的方法:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelFuture close() {
        return close(newPromise());
    }
    
    @Override
    public ChannelFuture close(final ChannelPromise promise) {
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }
        
        // 这个方法寻找下一个能够处理 CLOSE 事件的出站处理器
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeClose(promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeClose(promise);
                }
            }, promise, null, false);
        }

        return promise;
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        write(msg, true, promise);
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        
        // 这个方法寻找下一个能够处理相应事件的处理器
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }
    }    


    private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            // 向前寻找出站处理器
            ctx = ctx.prev;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
        return ctx;
    }
}

注意哦,出站方法是向前寻找处理器。

ctx.channel().close()

通常,ctx.channel() 返回的类可以是 NioSocketChannel,在调用 close 或者 writeAndFlush 这类出站方法时,
会调用 AbstractChannelclose 或者 writeAndFlush 方法,
再接着就是调用 DefaultChannelPipelineclose 或者 writeAndFlush 方法:

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelFuture close(ChannelPromise promise) {
        return tail.close(promise);
    }

    @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }
}

你会发现,都是从管道的 TailContext 开始调用,因此所有符合要求的出站处理器都将被执行。

但是,你还要注意,处理器的写法

public class SomeHandler extends ChannelOutboundHandlerAdapter {
    /**
     * 如果不覆写这个 close 方法,直接运行父类的方法,也是正常的。
     */
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        // 如果出站处理器中,不加下面这段话,可能会导致 Channel 无法被正常关闭!
        ctx.close(promise);
    }
    /**
     * 如果不覆写这个 write 方法,直接运行父类的方法,也是正常的。
     */
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 如果出站处理器中,不加下面这段话,可能会导致 Channel 无法正常发出消息!
        ctx.write(msg, promise);
    }
}