netty客户端与服务端事例
netty客户端与服务端例子
package com.snailteam.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * 服务端Bootstrap * @author * */ public class NServer { static final int PORT = 8080; public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { /** Inbound: 1 ->2 ->3 ->n 顺序处理 Outbound: n ->n-1 ->n-2 .. ->1 逆序处理 */ //new LengthFieldBasedFrameDecoder(1024*8*20, 0, 4,0,4) 最大1024*8*20位为接收数据包,从0,长4Byte是数据宽度,然后从0,长4Byte剔除后的byte数据包,传送 到后面的handler链处理 ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG),new LengthFieldBasedFrameDecoder(1024*8*20, 0, 4,0,4), new NServerHandler()); } }); // Bind and start to accept incoming connections. b.bind(PORT).sync().channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.snailteam.netty; import java.nio.charset.Charset; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class NServerHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { String str = msg.toString(Charset.forName("UTF-8")); System.out.println("[ok]" +str ); str = "辛苦了"+str.substring(str.lastIndexOf(',')+1); ctx.writeAndFlush(Unpooled.wrappedBuffer(str.getBytes())); } }
package com.snailteam.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * 客户端Bootstrap * */ public class Nclient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //LengthFieldPrepender 把发送的数据前加4Byte存储数据宽度,发送。 ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG),new LengthFieldPrepender(4), new NclientHandler()); } }); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 20; i++) { sb.append("中华人民共和国" + i + ","); } // Bind and start to accept incoming connections. Channel con = b.connect("localhost", NServer.PORT).sync().channel(); for (int i = 0; i < 900; i++) { String str = sb.toString() + i; con.writeAndFlush(Unpooled.wrappedBuffer(str.getBytes())).sync().get(); System.out.println(i); } con.close().sync();//异步退出 } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
package com.snailteam.netty; import java.nio.charset.Charset; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class NclientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println(msg.toString(Charset.forName("UTF-8"))); } }
package com.snailteam.netty; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; public class FrameDecoder extends ByteToMessageDecoder{ int lengthFeildLength = 4; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } private Object decode(ChannelHandlerContext ctx, ByteBuf in) { if(in.readableBytes()<lengthFeildLength)return null;// int index = in.readerIndex(); int len = in.readInt();//解析次数包中对象的大小 if(in.readableBytes()<len){//数据包的内容不全 in.readerIndex(index);//重置readerIndex return null; } return in.readRetainedSlice(len);//截取完整的一个转码对象。 } }
pom
<dependency> <groupId>io.netty</groupId> <artifactId>netty-example</artifactId> <version>4.1.6.Final</version> <exclusions> <exclusion> <artifactId>netty-tcnative</artifactId> <groupId>io.netty</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> <version>1.1.33.Fork23</version> <classifier>windows-x86_64</classifier> </dependency>