Netty处理程序未调用
我正在尝试使用一个简单的服务器-客户端应用程序(请参见下面的代码)进入Netty.
I'm trying to get into Netty using a simple server-client application (code see below).
我正在努力解决两个问题:
I'm struggling with two issues:
-
ConfigServerHandler响应. ConfigClientHandler被正确调用.但是FeedbackServerHandler分别.从不调用FeedbackClientHandler.为什么?根据文档,处理程序应一个接一个地调用.
the ConfigServerHandler resp. ConfigClientHandler is called correctly. But the FeedbackServerHandler resp. FeedbackClientHandler is never called. Why? According to the documentation the Handlers should be called one after another.
我想要几个处理程序.这些处理程序中的每个处理程序仅对另一端发送的某些消息感兴趣(例如,由客户端发送,由服务器接收).
I'd like to have several handlers. Each of these handlers is interested in only some of the messages that are send by the other side (e.g. send by client, received by server).
- 在处理程序(channelRead)收到消息后,是否应该过滤消息?如何区分不同的字符串?对于不同的对象,通过解析它们应该很容易.
- 是否可以为SocketChannel定义不同的ChannelPipelines?
- 更多方法?
感谢您的帮助!
KJ
这是创建服务器的方式:
This is how the server is created:
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ConfigServerHandler(),
new FeedbackServerHandler());
}
});
b.bind(mPort).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
其中一个Handler类(FeedbackServerHandler的功能完全相同,但解析为Integer):
One of the Handler classes (the FeedbackServerHandler does exactly the same but parses into Integer):
public class ConfigServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ConfigServerHandler::channelRead, " +(String)msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
客户端看起来非常相似:
The client side looks pretty similar:
public Client(String host, int port) throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(
new ObjectEncoder(),
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ConfigClientHandler(),
new FeedbackClientHandler());
}
});
b.connect(host, port).sync().channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
这是一个客户端处理程序(另一个处理程序发送一个Integer消息,并在'channelRead'方法中解析为Integer):
And here is one of the Client side handlers (the other one sends an Integer message and parses into Integer in the 'channelRead' method):
public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
private final String firstMessage = "blubber";
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("ConfigClientHandler::channelActive");
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ConfigClientHandler::channelRead, " +(String)msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
您正在将ChannelInboundHandlerAdapter
用于中间"处理程序ConfigXxxxxHandler
.
You're using ChannelInboundHandlerAdapter
, which is fine, for your "middle" handler ConfigXxxxxHandler
.
但是您使用channelRead
方法,然后在ctx.write(msg)
内部使用. ctx.write(msg)
将首先通过上一个处理程序(ObjectDecoder
)将消息写回到其他服务器,而不是通过下一个处理程序(在您的情况下为FeedbackClientHandler
).
But you use channelRead
method and then use inside ctx.write(msg)
. ctx.write(msg)
will write the msg back to the other server through the previous handler first (ObjectDecoder
), not to the next handler (FeedbackClientHandler
in your case).
如果要将消息发送到下一个处理程序,则应使用以下内容:
You should use the following if you want to send the message to the next handler:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ConfigClientHandler::channelRead, " +(String)msg);
ctx.fireChannelRead(msg);
}
当然也没有channelReadComplete
中的ctx.flush()
(因为不再在此处写入).
但是,当然,在最终的FeedbackClientHandler
中,对ctx.write(yourNewMessage)
使用flush方法或对ctx.writeAndFlush(yourNewMessage)
使用
And of course no ctx.flush()
in channelReadComplete
(since no more write there).
But in your final FeedbackClientHandler
, of course, use the flush method with ctx.write(yourNewMessage)
or use ctx.writeAndFlush(yourNewMessage)
.
因此恢复:
-
ctx.write
会将消息发送到线路,然后发送到先前的处理程序,再到通道,再发送到网络,以 Outbound 方式 -
ctx.fireChannelRead
会将消息发送到下一个处理程序(相反的方式),因此入站方式
-
ctx.write
will send the message to the wire, so to the previous handler down to the channel then to the network, so Outbound way -
ctx.fireChannelRead
will send the message to the next following handler (opposite way), so Inbound way
请参见 http://netty.io /wiki/new-and-noteworthy-in-4.0.html#wiki-h4-16 了解详情.
您也许还应该反转编码器/解码器,因为通常最好先在管道中安装解码器,然后再安装编码器.
You should perhaps also invert Encoder/Decoder, since in general it is good idea to have first the decoder, then the encoder in the pipeline.
p.addLast(
new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
new ObjectEncoder(),
new ConfigClientHandler(),
new FeedbackClientHandler());