Active MQ5.7版在NIO模式下SSL通信的有关问题以及后续版本的解决
Active MQ 5.7版在NIO模式下SSL通信可能会出现BufferUnderFlowException异常。NIO模式下SSL通信的工作类为 juti org.apache.activemq.transport.nio.NIOSSLTransport,数据包传输格式为数据长度dataL +数据内容dataContent,所以基本的命令算法是这样的
1 //得到数据包长度 nextFrameSize = plain.getInt(); 2//为命令分配buffer,总长度为数据包长度 + 4,4是nextFrameSize自身的长度 currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); 3 //从socket buffer中读取数据到命令buffer currentBuffer.putInt(nextFrameSize); // 放入数据包长度 // 如果从socket读取的数据小于当前命令buffer的可用长度,则将socket数据全部读取命令buffer中 if (currentBuffer.remaining() >= plain.remaining()) { currentBuffer.put(plain); } //如果从socket读取的数据大于当前命令buffer的可用长度,则只从socket读取当前命令buffer的可用长度,这样保证读取的数据只属于本命令的数据 else { byte[] fill = new byte[currentBuffer.remaining()]; plain.get(fill); currentBuffer.put(fill); } 4 //处理从socket读取的命令 //如果命令buffer还没有写满,则返回,等待下个数据包 if (currentBuffer.hasRemaining()) { return; } else { //消费数据 nextFrameSize = -1;//当前命令已经读取完毕
以上是5.7版本的处理方式,看起来不错,但是有个致命的问题,就是在第一步,
1 //得到数据包长度
nextFrameSize = plain.getInt();
由于是NIO方式,socket读取的数据不一定完整,这里plain的数据可能不到4个字节,也就是不是一个Int类型,调用plain.getInt();可能会抛出BufferUnderFlowException
在5.8版本对这个问题作了大幅度的处理,思路是这样的:
1 根据数据包长度来判断数据包是否读取完整,如果没有读完,继续读;如果读完了,则处理该数据包;
2 读取分为两种情况,第一次读取还是第n次读取:
3 如果是第一次读,则需要先读取数据包长度,根据数据包长度分配当前buffer大小,然后读取数据包的内容部分;
4 如果是第n次读,则直到把当前buffer写满就可以;
5 在读取数据包长度的时候,要判断数据包是否包含完整的Int类型数据,就是数据包长度是否大于等于32:
如果数据包长度小于32,与前面读取数据内容类似,则读取分为两种情况,第一次读取还是第n次读取:第一次读取需要为当前buffer分配4个字节,也就是32位,读取数据然后返回;第n次读取判断当前buffer是否写满,如果写满了就从从当前buffer中读取Int值
6 读取了数据包长度后构造新的当前buffer,长度为数据内容长度+4,转到步骤4
这样就彻底解决了NIO模式下的问题,下面是5.8版本完整的处理读入数据的代码:
protected void processCommand(ByteBuffer plain) throws Exception { if (nextFrameSize == -1) { if (plain.remaining() < Integer.SIZE) { if (currentBuffer == null) { currentBuffer = ByteBuffer.allocate(4); } while (currentBuffer.hasRemaining() && plain.hasRemaining()) { currentBuffer.put(plain.get()); } if (currentBuffer.hasRemaining()) { return; } else { currentBuffer.flip(); nextFrameSize = currentBuffer.getInt(); } } else { if (currentBuffer != null) { while (currentBuffer.hasRemaining()) { currentBuffer.put(plain.get()); } currentBuffer.flip(); nextFrameSize = currentBuffer.getInt(); } else { nextFrameSize = plain.getInt(); } } if (wireFormat instanceof OpenWireFormat) { long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize(); if (nextFrameSize > maxFrameSize) { throw new IOException("***); } } currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); currentBuffer.putInt(nextFrameSize); } else { if (currentBuffer.remaining() >= plain.remaining()) { currentBuffer.put(plain); } else { byte[] fill = new byte[currentBuffer.remaining()] plain.get(fill); currentBuffer.put(fill); } if (currentBuffer.hasRemaining()) { return; } else { currentBuffer.flip(); Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); doConsume((Command) command); nextFrameSize = -1; currentBuffer = null; } } }