Apache mina设立默认的write和read buffer,以及奥秘

Apache mina设置默认的write和read buffer,以及奥秘
最近在做一个项目,用到mina,但是对于mina发送文件,或者报文分包发送有很多不明白的。查看了很多资料,其中找到一位仁兄的发送文件的代码,是一个客户端上传文件到服务器的例子。

作为本文的引子:

客户端程序

package test;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class ClientMain {

	/**
	 * @author daijun ,Nov 26, 2009
	 * @param args
	 * @throws InterruptedException
	 * @throws IOException
	 */
	public static void main(String[] args) throws InterruptedException, IOException {
		try {
			NioSocketConnector connector = new NioSocketConnector();
			DefaultIoFilterChainBuilder chain = connector.getFilterChain();
			connector.setHandler(new FileSenderHandler());
			ConnectFuture connectFuture = connector.connect(new InetSocketAddress("127.0.0.1", 3333));
			IoSession session = null;
			for (;;) {
				try {
					ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 3333));
					future.awaitUninterruptibly();
					session = future.getSession();
					break;
				} catch (RuntimeIoException e) {
					System.err.println("Failed to connect.");
					e.printStackTrace();
					Thread.sleep(5000);
				}
			}
			File f = new File("e:/20100425171.jpg");
			// System.out.println(f.length());
			FileInputStream fin = new FileInputStream(f);
			FileChannel fc = fin.getChannel();
			ByteBuffer bb = ByteBuffer.allocate(2048 * 1000);
			boolean flag = true;
			while (true) {
				// 不间断发送会导致buffer异常
				if (!flag) {
					Thread.sleep(1000);
				}
				bb.clear();
				int i = fc.read(bb);
				System.out.println(i);
				if (i == -1) {
					System.out.println("exit");
					break;
				}
				// 包装成自己的iobuffer
				IoBuffer ib = IoBuffer.wrap(bb);
				bb.flip();
				session.write(ib);
				flag = false;
			}
			session.close(true);
			
		} catch (Throwable e) {
			e.printStackTrace();
		}
	}
}



客户端的handler,其实没啥用,就是那位仁兄的代码

package test;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

public class FileSenderHandler extends IoHandlerAdapter {
	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
	}

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		session.close(true);
	}
	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		System.out.println("hidsda");
		super.messageSent(session, message);
	}

}



Server端代码:
package test;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class ServerMain {

	/**
	 * @author daijun ,Nov 26, 2009
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		SocketAcceptor acceptor = new NioSocketAcceptor();
		DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
		acceptor.setHandler(new FileReceiveHandler());
		acceptor.bind(new InetSocketAddress(3333));
	}

}



Server端Handler:
package test;

import java.io.FileOutputStream;
import java.nio.channels.FileChannel;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

public class FileReceiveHandler extends IoHandlerAdapter {
	private static FileChannel fc;

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		IoBuffer ib = (IoBuffer) message;
		System.out.println(ib.array().length);
		if (fc == null) {
			fc = new FileOutputStream("z:\\copyed.rar").getChannel();
		}
		fc.write(ib.buf());
	}

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		System.out.println("over");
		fc.close();
		session.close(true);
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		System.out.println("session idle");
	}
}



对于mina的普通机制,大家可以网上找资料,这里不重点这里提到。



通过上述的Client代码发送一个大图片文件(我选择了1.6MB左右的文件)
			File f = new File("e:/20100425171.jpg");
			// System.out.println(f.length());
			FileInputStream fin = new FileInputStream(f);
			FileChannel fc = fin.getChannel();
			ByteBuffer bb = ByteBuffer.allocate(2048 * 1000);
			boolean flag = true;
			while (true) {
				// 不间断发送会导致buffer异常
				if (!flag) {
					Thread.sleep(1000);
				}
				bb.clear();
				int i = fc.read(bb);
				System.out.println(i);
				if (i == -1) {
					System.out.println("exit");
					break;
				}
				// 包装成自己的iobuffer
				IoBuffer ib = IoBuffer.wrap(bb);
				bb.flip();
				session.write(ib);
				flag = false;
			}



而服务端的的handler部分
	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		IoBuffer ib = (IoBuffer) message;
		System.out.println(ib.array().length);
		if (fc == null) {
			fc = new FileOutputStream("z:\\copyed.rar").getChannel();
		}
		fc.write(ib.buf());
	}


被调用多次,输出结果如下
2048
4096
8192
16384
32768
65536
65536
65536
65536
65536
65536
65536
65536
65536
65536
…………(省略)


才发现Mina对大文件发送有大小限制,如果什么不设置的情况下,如果一个文件的大小超过
65536个字节,将被切片发送。

可以参考mina的类
org.apache.mina.core.session.IoSessionConfig
void setReadBufferSize(int size):
这个方法设置读取缓冲的字节数,但一般不需要调用这个方法,因为IoProcessor 会自动调
整缓冲的大小。你可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,这
样无论IoProcessor 无论如何自动调整,都会在你指定的区间。

此外IoSessionConfig还有一个很重要的方法setMaxReadBufferSize,设置最大读缓存内容
官方解析如下:

Sets the maximum size of the read buffer that I/O processor allocates per each read.  I/O processor will not increase the read buffer size to the greater value than this property value.

翻译:一旦设置 I/O processor读缓冲区(buffer)的最大容量, I/O processor的一次读取缓冲的容量无论怎样增加也不会超过此值(maxReadBufferSize)。


下面做一个实验,对ServerMain的main方法改变如下   :
	public static void main(String[] args) throws IOException {
		SocketAcceptor acceptor = new NioSocketAcceptor();
		DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
		acceptor.setHandler(new FileReceiveHandler());
		acceptor.bind(new InetSocketAddress(3333));
//		acceptor.getSessionConfig().setReceiveBufferSize(1024);
		acceptor.getSessionConfig().setReadBufferSize(1024);
		acceptor.getSessionConfig().setMaxReadBufferSize(8888);
		
		System.out.println(acceptor.getSessionConfig().getReadBufferSize());
	}


增加两行
		acceptor.getSessionConfig().setReadBufferSize(1024);
		acceptor.getSessionConfig().setMaxReadBufferSize(8888);



同样文件,同样操作,可以看到服务器输出如下:
1024
1024
2048
4096
8192
8888
8888
8888
8888
8888
8888
8888
8888
8888
……………………(省略)

上面数据可以看出,读取Buffer的时候,先按照最ReadBufferSize来读取,然后每下一次读取的量都是前一次的两倍,直到与MaxReadBufferSize相同位置。
(mina默认的情况下,最大可以读取缓存大小为65536,最小为64,正常为2048)
请看org.apache.mina.core.session.AbstractIoSessionConfig

    private int minReadBufferSize = 64;
    private int readBufferSize = 2048;
    private int maxReadBufferSize = 65536;
    private int idleTimeForRead;
    private int idleTimeForWrite;
    private int idleTimeForBoth;
    private int writeTimeout = 60;
    private boolean useReadOperation;
    private int throughputCalculationInterval = 3;


对于读缓冲区增加

摘自类org.apache.mina.core.polling.AbstractPollingIoProcessor<T>

的read方法,这里是mina读取字节流的底层,(再往深一层就是rt.jar包提供sun.nio.ch.SocketChannelImpl类中的read(ByteBuffer paramByteBuffer)方法
这里就是mina通信最底层的原型,有兴趣的读者可以自己研读其代码,这里不冗述.)
下面是我对该代码的一些理解,用注释标明
    private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation = session.getTransportMetadata()
                .hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
//读取SocketChannelImp中的数据,然后装进buf中,SocketChannelImp中的数据多少由发送端决定,可以累积.
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
//把读取的数据扔到过滤链中(另外一条线程,异步执行).最后反映到IoHandler中
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
//第一次以ReadBufferSize去读取,每一次增加读取量为前一次的两倍,直到等于MaxReadBufferSize为止
                        session.increaseReadBufferSize();
                    }
                }
            }
            
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                if (!(e instanceof PortUnreachableException)
                        || !AbstractDatagramSessionConfig.class
                                .isAssignableFrom(config.getClass())
                        || ((AbstractDatagramSessionConfig) config)
                                .isCloseOnPortUnreachable())

                    scheduleRemove(session);
            }
            
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }


以上注释,说明为什么会每次读取量是前一次的两倍的原因。




下一篇文章还是以本代码解析一下发送端的write的buffer大小的奥秘