Netty TCP粘包/拆包 自定义字符串跟字符串长度 2种分包方式
Netty TCP粘包/拆包 自定义字符串和字符串长度 2种分包方式
package bhz.netty.ende1; import java.nio.ByteBuffer; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Server { /** * 用分隔符区分每条请求信息 */ public static void main(String[] args) throws Exception{ //1 创建2个线程,一个是负责接收客户端的连接。一个是负责进行数据传输的 EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 创建服务器辅助类 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //设置特殊分隔符 (这样就可以判断每次请求什么时候结束) ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //设置定长字符串接收 //sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //设置字符串形式的解码(这样服务器端就可以以字符串形式接收到数据) sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 绑定连接 ChannelFuture cf = b.bind(8765).sync(); //等待服务器监听端口关闭 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
package bhz.netty.ende1; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //自定义字符串分包符号 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //通过设置每次字符串长度分包 //sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes())); //等待客户端端口关闭 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
package bhz.netty.ende1; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //服务器直接以字符串形式接收到数据 String request = (String)msg; System.out.println("Server :" + msg); String response = "服务器响应:" + msg ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
package bhz.netty.ende1; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String)msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }