基于Netty框架的JT809上司平台验证实现
基于Netty框架的JT809上级平台验证实现
基于netty3的JT809协议验证实现。程序可以接收下级平台发送的业务报文,并转换成POJO对象。程序只处理了两个业务类型:下级平台登录、主链路连接保持消息。没有实现从链路的建立。
数据流:下级平台-->LoggingHandler-->Decoder-->Message-->BusiHandler-->Message-->Encoder-->LoggingHandler-->下级平台。
由于是技术验证,代码中的一些值是写死的。
基于netty3的JT809协议验证实现。程序可以接收下级平台发送的业务报文,并转换成POJO对象。程序只处理了两个业务类型:下级平台登录、主链路连接保持消息。没有实现从链路的建立。
数据流:下级平台-->LoggingHandler-->Decoder-->Message-->BusiHandler-->Message-->Encoder-->LoggingHandler-->下级平台。
- LoggingHandler是Netty框架提供的日志Handler,可以在控制台显示出站、入站的数据,方便调试。
- Decoder负责报文数据的完整性处理,并转换字节流为报文POJO对象。
- Message为自定义的JT809协议POJO对象
- BusiHandler为业务处理类
- Encoder负责将报文POJO对象拼装成ChannelBuffer对象传输
- Util.crc16是从网上找的CRC校验方法
由于是技术验证,代码中的一些值是写死的。
package test809; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.logging.LoggingHandler; import org.jboss.netty.logging.InternalLogLevel; public class JTServer { public static void main(String args[]) { ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("loging", new LoggingHandler(InternalLogLevel.INFO)); pipeline.addLast("decoder", new Decoder()); pipeline.addLast("encoder", new Encoder()); pipeline.addLast("busiHandler", new BusiHandler()); return pipeline; } }); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.bind(new InetSocketAddress(8001)); System.out.println("JTServer startup ...."); } }
package test809; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class Decoder extends FrameDecoder{ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { // TODO Auto-generated method stub int head = buffer.getByte(0); int tail = buffer.getByte(buffer.capacity()-1); System.out.println(buffer.getByte(buffer.capacity()-1)); System.out.println(buffer.getByte(0)); if(!(head == Message.MSG_HEAD && tail == Message.MSG_TAIL)){ return null; } //跳过头标识 buffer.skipBytes(1); //读取报文长度(目前有不一致情况) Message msg = this.buildMessage(buffer); return msg; } private Message buildMessage(ChannelBuffer buffer){ Message msg = new Message(); //读取报文长度(目前有不一致情况) msg.setMsgLength(buffer.readUnsignedInt()); msg.setMsgSn(buffer.readInt());//4 byte msg.setMsgId(buffer.readUnsignedShort());//2 byte msg.setMsgGesscenterId(buffer.readUnsignedInt());//4 byte msg.setVersionFlag(buffer.readBytes(3).array());//3 byte msg.setEncryptFlag(buffer.readUnsignedByte());//1 byte msg.setEncryptKey(buffer.readUnsignedInt());//4 byte ChannelBuffer bodyBytes = buffer.readBytes(buffer.readableBytes()-2-1);//数据体为变长字节 msg.setMsgBody(bodyBytes); msg.setCrcCode(buffer.readUnsignedShort());//2 byte //跳过尾标识 buffer.skipBytes(1);//1 byte //buffer.readByte(); System.out.println("after build message readable bytes:"+buffer.readableBytes()); return msg; } }
package test809; import java.nio.charset.Charset; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class BusiHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { // TODO Auto-generated method stub Message msg = (Message) e.getMessage(); switch (msg.getMsgId()) { case 0x1001: login(msg, ctx, e); break; case 0x1005: System.out.println("主链路连接保持请求消息。"); heartBeat(msg, ctx, e); break; default: break; } } private void login(Message msg, ChannelHandlerContext ctx, MessageEvent e) { int userId = msg.getMsgBody().readInt(); String passWord = msg.getMsgBody().readBytes(8).toString(Charset.forName("GBK")); String ip = msg.getMsgBody().readBytes(32).toString(Charset.forName("GBK")); int port = msg.getMsgBody().readUnsignedShort(); msg.getMsgBody().clear(); System.out.println(userId); System.out.println(passWord); System.out.println(ip); System.out.println(port); Message msgRep = new Message(0x1002); ChannelBuffer buffer = ChannelBuffers.buffer(5); buffer.writeByte(0x00); //校验码,临时写死 buffer.writeInt(1111); msgRep.setMsgBody(buffer); ChannelFuture f = e.getChannel().write(msgRep); // f.addListener(ChannelFutureListener.CLOSE); } private void heartBeat(Message msg, ChannelHandlerContext ctx, MessageEvent e){ Message msgRep = new Message(0x1006); ChannelBuffer buffer = ChannelBuffers.buffer(0); msgRep.setMsgBody(buffer); ChannelFuture f = e.getChannel().write(msgRep); } }
package test809; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class Encoder extends SimpleChannelHandler{ @Override public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { // TODO Auto-generated method stub ChannelBuffer buffer = buildMessage((Message) e.getMessage());; if(buffer != null){ Channels.write(ctx, e.getFuture(), buffer); } } /** * 生成下行报文 * @param msg * @return */ private ChannelBuffer buildMessage(Message msg){ int bodyLength = msg.getMsgBody().capacity(); ChannelBuffer buffer = ChannelBuffers.buffer(bodyLength+Message.MSG_FIX_LENGTH); buffer.writeByte(Message.MSG_HEAD); //1 //--------------数据头---------- buffer.writeInt(buffer.capacity()); //4 buffer.writeInt(msg.getMsgSn()); //4 buffer.writeShort(msg.getMsgId()); //2 buffer.writeInt(1); //4 buffer.writeBytes(msg.getVersionFlag());//3 buffer.writeByte(0);//1 buffer.writeInt(20000000);//4 //--------------数据体---------- buffer.writeBytes(msg.getMsgBody()); //------------crc校验码--------- byte[] b = ChannelBuffers.buffer(bodyLength+22).array(); buffer.getBytes(1, b); int crcValue = Util.crc16(b); buffer.writeShort(crcValue);//2 buffer.writeByte(Message.MSG_TAIL);//1 System.out.println("before send :"+buffer); return buffer; } }
package test809; import org.jboss.netty.buffer.ChannelBuffer; public class Message { public static final int MSG_HEAD = 0x5b; public static final int MSG_TAIL = 0x5d; //报文中除数据体外,固定的数据长度 public static final int MSG_FIX_LENGTH = 26; //报文序列号,自增。 private static int internalMsgNo = 0; private long msgLength, encryptFlag=1, msgGesscenterId, encryptKey; private int crcCode,msgId,msgSn; private ChannelBuffer msgBody; private byte[] versionFlag = {0,0,1}; //下行报文标识,值为1时,代表发送的数据;默认为0,代表接收的报文 //private int downFlag = 0; public Message(){} public Message(int msgId){ //下行报文需要填充报文序列号 synchronized((Integer)internalMsgNo) { if(internalMsgNo == Integer.MAX_VALUE){ internalMsgNo = 0; } } this.msgSn = ++internalMsgNo; this.msgId = msgId; //this.downFlag = 1; } public int getCrcCode() { return crcCode; } public void setCrcCode(int crcCode) { this.crcCode = crcCode; } public byte[] getVersionFlag() { return versionFlag; } public void setVersionFlag(byte[] versionFlagBytes) { this.versionFlag = versionFlagBytes; } public long getMsgLength() { return msgLength; } public void setMsgLength(long msgLength) { this.msgLength = msgLength; } public long getEncryptFlag() { return encryptFlag; } public void setEncryptFlag(int encryptFlag) { this.encryptFlag = encryptFlag; } public int getMsgSn() { return msgSn; } public void setMsgSn(int msgSn) { this.msgSn = msgSn; } public int getMsgId() { return msgId; } public void setMsgId(int msgId) { this.msgId = msgId; } public long getMsgGesscenterId() { return msgGesscenterId; } public void setMsgGesscenterId(long msgGesscenterId) { this.msgGesscenterId = msgGesscenterId; } public long getEncryptKey() { return encryptKey; } public void setEncryptKey(long encryptKey) { this.encryptKey = encryptKey; } public ChannelBuffer getMsgBody() { return msgBody; } public void setMsgBody(ChannelBuffer msgBody) { System.out.println("0x"+Integer.toHexString(this.msgId)+": "+msgBody.capacity()+" 字节数据体."); this.msgBody = msgBody; } @Override public String toString() { // TODO Auto-generated method stub return ("MSG_ID:" + this.msgId + " --> " + this.msgBody + " version flag:" + this.versionFlag + " encryptKey:" + this.encryptKey + " crcCode:" + this.crcCode+" msgGesscenterId:"+this.msgGesscenterId); } }
package test809; public class Util { public static int crc16(byte[]... bytesArr) { int b = 0; int crc = 0xffff; for (byte[] d : bytesArr) { for (int i = 0; i < d.length; i++) { for (int j = 0; j < 8; j++) { b = ((d[i] << j) & 0x80) ^ ((crc & 0x8000) >> 8); crc <<= 1; if (b != 0) crc ^= 0x1021; } } } crc = ~crc; return crc; } }