Active MQ5.7版在NIO模式下SSL通信的有关问题以及后续版本的解决

Active MQ5.7版在NIO模式下SSL通信的问题以及后续版本的解决

Active MQ 5.7版在NIO模式下SSL通信可能会出现BufferUnderFlowException异常。NIO模式下SSL通信的工作类为 juti org.apache.activemq.transport.nio.NIOSSLTransport,数据包传输格式为数据长度dataL +数据内容dataContent,所以基本的命令算法是这样的

    

1 //得到数据包长度
nextFrameSize = plain.getInt();

2//为命令分配buffer,总长度为数据包长度 + 4,4是nextFrameSize自身的长度
currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); 

3 //从socket buffer中读取数据到命令buffer
  currentBuffer.putInt(nextFrameSize); // 放入数据包长度

// 如果从socket读取的数据小于当前命令buffer的可用长度,则将socket数据全部读取命令buffer中        if (currentBuffer.remaining() >= plain.remaining()) {
            currentBuffer.put(plain);
       }

//如果从socket读取的数据大于当前命令buffer的可用长度,则只从socket读取当前命令buffer的可用长度,这样保证读取的数据只属于本命令的数据
	 else {
               byte[] fill = new byte[currentBuffer.remaining()];
               plain.get(fill);
              currentBuffer.put(fill);
         }
4 //处理从socket读取的命令        
//如果命令buffer还没有写满,则返回,等待下个数据包
     if (currentBuffer.hasRemaining()) {
            return;
     } else {      
          //消费数据
            nextFrameSize = -1;//当前命令已经读取完毕

 

以上是5.7版本的处理方式,看起来不错,但是有个致命的问题,就是在第一步,

1 //得到数据包长度

nextFrameSize = plain.getInt();

 

由于是NIO方式,socket读取的数据不一定完整,这里plain的数据可能不到4个字节,也就是不是一个Int类型,调用plain.getInt();可能会抛出BufferUnderFlowException

 

5.8版本对这个问题作了大幅度的处理,思路是这样的:

1 根据数据包长度来判断数据包是否读取完整,如果没有读完,继续读;如果读完了,则处理该数据包;

2 读取分为两种情况,第一次读取还是第n次读取:

3 如果是第一次读,则需要先读取数据包长度,根据数据包长度分配当前buffer大小,然后读取数据包的内容部分;

4 如果是n次读,则直到把当前buffer写满就可以;

5 在读取数据包长度的时候,要判断数据包是否包含完整的Int类型数据,就是数据包长度是否大于等于32

如果数据包长度小于32,与前面读取数据内容类似,则读取分为两种情况,第一次读取还是第n次读取:第一次读取需要为当前buffer分配4个字节,也就是32位,读取数据然后返回;第n次读取判断当前buffer是否写满,如果写满了就从从当前buffer中读取Int

6 读取了数据包长度后构造新的当前buffer,长度为数据内容长度+4,转到步骤4

 

这样就彻底解决了NIO模式下的问题,下面是5.8版本完整的处理读入数据的代码:

 

protected void  processCommand(ByteBuffer plain) throws Exception {
        if (nextFrameSize == -1) {
            if (plain.remaining() < Integer.SIZE) {
                if (currentBuffer == null) {
                    currentBuffer = ByteBuffer.allocate(4);
                }
       while (currentBuffer.hasRemaining() && plain.hasRemaining()) {                    
                currentBuffer.put(plain.get());
                }
              if (currentBuffer.hasRemaining()) {
                    return;
                } else {
                    currentBuffer.flip();
                    nextFrameSize = currentBuffer.getInt();
                }
            } else {
                if (currentBuffer != null) {
                    while (currentBuffer.hasRemaining()) {
                        currentBuffer.put(plain.get());
                    }
                    currentBuffer.flip();
                    nextFrameSize = currentBuffer.getInt();
                } else {
                    nextFrameSize = plain.getInt();
                }
            }
            if (wireFormat instanceof OpenWireFormat) {
long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
                if (nextFrameSize > maxFrameSize) {
throw new IOException("***);
                }
            }
            currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
            currentBuffer.putInt(nextFrameSize); 
        } else {
            if (currentBuffer.remaining() >= plain.remaining()) {
                currentBuffer.put(plain); 
            } else {
                byte[] fill = new byte[currentBuffer.remaining()] 
                plain.get(fill);
                currentBuffer.put(fill);
            }
            if (currentBuffer.hasRemaining()) {
                return;
            } else {
                currentBuffer.flip(); Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
                doConsume((Command) command);
                nextFrameSize = -1;
                currentBuffer = null;
            }
        } 
    }