Apache mina设立默认的write和read buffer,以及奥秘
Apache mina设置默认的write和read buffer,以及奥秘
最近在做一个项目,用到mina,但是对于mina发送文件,或者报文分包发送有很多不明白的。查看了很多资料,其中找到一位仁兄的发送文件的代码,是一个客户端上传文件到服务器的例子。
作为本文的引子:
客户端程序
客户端的handler,其实没啥用,就是那位仁兄的代码
Server端代码:
Server端Handler:
对于mina的普通机制,大家可以网上找资料,这里不重点这里提到。
通过上述的Client代码发送一个大图片文件(我选择了1.6MB左右的文件)
而服务端的的handler部分
被调用多次,输出结果如下
2048
4096
8192
16384
32768
65536
65536
65536
65536
65536
65536
65536
65536
65536
65536
…………(省略)
才发现Mina对大文件发送有大小限制,如果什么不设置的情况下,如果一个文件的大小超过
65536个字节,将被切片发送。
可以参考mina的类
org.apache.mina.core.session.IoSessionConfig
void setReadBufferSize(int size):
这个方法设置读取缓冲的字节数,但一般不需要调用这个方法,因为IoProcessor 会自动调
整缓冲的大小。你可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,这
样无论IoProcessor 无论如何自动调整,都会在你指定的区间。
此外IoSessionConfig还有一个很重要的方法setMaxReadBufferSize,设置最大读缓存内容
官方解析如下:
Sets the maximum size of the read buffer that I/O processor allocates per each read. I/O processor will not increase the read buffer size to the greater value than this property value.
翻译:一旦设置 I/O processor读缓冲区(buffer)的最大容量, I/O processor的一次读取缓冲的容量无论怎样增加也不会超过此值(maxReadBufferSize)。
下面做一个实验,对ServerMain的main方法改变如下 :
增加两行
同样文件,同样操作,可以看到服务器输出如下:
1024
1024
2048
4096
8192
8888
8888
8888
8888
8888
8888
8888
8888
8888
……………………(省略)
上面数据可以看出,读取Buffer的时候,先按照最ReadBufferSize来读取,然后每下一次读取的量都是前一次的两倍,直到与MaxReadBufferSize相同位置。
(mina默认的情况下,最大可以读取缓存大小为65536,最小为64,正常为2048)
请看org.apache.mina.core.session.AbstractIoSessionConfig
对于读缓冲区增加
摘自类org.apache.mina.core.polling.AbstractPollingIoProcessor<T>
的read方法,这里是mina读取字节流的底层,(再往深一层就是rt.jar包提供sun.nio.ch.SocketChannelImpl类中的read(ByteBuffer paramByteBuffer)方法
这里就是mina通信最底层的原型,有兴趣的读者可以自己研读其代码,这里不冗述.)
下面是我对该代码的一些理解,用注释标明
以上注释,说明为什么会每次读取量是前一次的两倍的原因。
下一篇文章还是以本代码解析一下发送端的write的buffer大小的奥秘
最近在做一个项目,用到mina,但是对于mina发送文件,或者报文分包发送有很多不明白的。查看了很多资料,其中找到一位仁兄的发送文件的代码,是一个客户端上传文件到服务器的例子。
作为本文的引子:
客户端程序
package test; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import org.apache.mina.core.RuntimeIoException; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IoSession; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class ClientMain { /** * @author daijun ,Nov 26, 2009 * @param args * @throws InterruptedException * @throws IOException */ public static void main(String[] args) throws InterruptedException, IOException { try { NioSocketConnector connector = new NioSocketConnector(); DefaultIoFilterChainBuilder chain = connector.getFilterChain(); connector.setHandler(new FileSenderHandler()); ConnectFuture connectFuture = connector.connect(new InetSocketAddress("127.0.0.1", 3333)); IoSession session = null; for (;;) { try { ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 3333)); future.awaitUninterruptibly(); session = future.getSession(); break; } catch (RuntimeIoException e) { System.err.println("Failed to connect."); e.printStackTrace(); Thread.sleep(5000); } } File f = new File("e:/20100425171.jpg"); // System.out.println(f.length()); FileInputStream fin = new FileInputStream(f); FileChannel fc = fin.getChannel(); ByteBuffer bb = ByteBuffer.allocate(2048 * 1000); boolean flag = true; while (true) { // 不间断发送会导致buffer异常 if (!flag) { Thread.sleep(1000); } bb.clear(); int i = fc.read(bb); System.out.println(i); if (i == -1) { System.out.println("exit"); break; } // 包装成自己的iobuffer IoBuffer ib = IoBuffer.wrap(bb); bb.flip(); session.write(ib); flag = false; } session.close(true); } catch (Throwable e) { e.printStackTrace(); } } }
客户端的handler,其实没啥用,就是那位仁兄的代码
package test; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; public class FileSenderHandler extends IoHandlerAdapter { @Override public void messageReceived(IoSession session, Object message) throws Exception { } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.close(true); } @Override public void messageSent(IoSession session, Object message) throws Exception { System.out.println("hidsda"); super.messageSent(session, message); } }
Server端代码:
package test; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class ServerMain { /** * @author daijun ,Nov 26, 2009 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { SocketAcceptor acceptor = new NioSocketAcceptor(); DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); acceptor.setHandler(new FileReceiveHandler()); acceptor.bind(new InetSocketAddress(3333)); } }
Server端Handler:
package test; import java.io.FileOutputStream; import java.nio.channels.FileChannel; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; public class FileReceiveHandler extends IoHandlerAdapter { private static FileChannel fc; @Override public void messageReceived(IoSession session, Object message) throws Exception { IoBuffer ib = (IoBuffer) message; System.out.println(ib.array().length); if (fc == null) { fc = new FileOutputStream("z:\\copyed.rar").getChannel(); } fc.write(ib.buf()); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println("over"); fc.close(); session.close(true); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("session idle"); } }
对于mina的普通机制,大家可以网上找资料,这里不重点这里提到。
通过上述的Client代码发送一个大图片文件(我选择了1.6MB左右的文件)
File f = new File("e:/20100425171.jpg"); // System.out.println(f.length()); FileInputStream fin = new FileInputStream(f); FileChannel fc = fin.getChannel(); ByteBuffer bb = ByteBuffer.allocate(2048 * 1000); boolean flag = true; while (true) { // 不间断发送会导致buffer异常 if (!flag) { Thread.sleep(1000); } bb.clear(); int i = fc.read(bb); System.out.println(i); if (i == -1) { System.out.println("exit"); break; } // 包装成自己的iobuffer IoBuffer ib = IoBuffer.wrap(bb); bb.flip(); session.write(ib); flag = false; }
而服务端的的handler部分
@Override public void messageReceived(IoSession session, Object message) throws Exception { IoBuffer ib = (IoBuffer) message; System.out.println(ib.array().length); if (fc == null) { fc = new FileOutputStream("z:\\copyed.rar").getChannel(); } fc.write(ib.buf()); }
被调用多次,输出结果如下
2048
4096
8192
16384
32768
65536
65536
65536
65536
65536
65536
65536
65536
65536
65536
…………(省略)
才发现Mina对大文件发送有大小限制,如果什么不设置的情况下,如果一个文件的大小超过
65536个字节,将被切片发送。
可以参考mina的类
org.apache.mina.core.session.IoSessionConfig
void setReadBufferSize(int size):
这个方法设置读取缓冲的字节数,但一般不需要调用这个方法,因为IoProcessor 会自动调
整缓冲的大小。你可以调用setMinReadBufferSize()、setMaxReadBufferSize()方法,这
样无论IoProcessor 无论如何自动调整,都会在你指定的区间。
此外IoSessionConfig还有一个很重要的方法setMaxReadBufferSize,设置最大读缓存内容
官方解析如下:
Sets the maximum size of the read buffer that I/O processor allocates per each read. I/O processor will not increase the read buffer size to the greater value than this property value.
翻译:一旦设置 I/O processor读缓冲区(buffer)的最大容量, I/O processor的一次读取缓冲的容量无论怎样增加也不会超过此值(maxReadBufferSize)。
下面做一个实验,对ServerMain的main方法改变如下 :
public static void main(String[] args) throws IOException { SocketAcceptor acceptor = new NioSocketAcceptor(); DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); acceptor.setHandler(new FileReceiveHandler()); acceptor.bind(new InetSocketAddress(3333)); // acceptor.getSessionConfig().setReceiveBufferSize(1024); acceptor.getSessionConfig().setReadBufferSize(1024); acceptor.getSessionConfig().setMaxReadBufferSize(8888); System.out.println(acceptor.getSessionConfig().getReadBufferSize()); }
增加两行
acceptor.getSessionConfig().setReadBufferSize(1024); acceptor.getSessionConfig().setMaxReadBufferSize(8888);
同样文件,同样操作,可以看到服务器输出如下:
1024
1024
2048
4096
8192
8888
8888
8888
8888
8888
8888
8888
8888
8888
……………………(省略)
上面数据可以看出,读取Buffer的时候,先按照最ReadBufferSize来读取,然后每下一次读取的量都是前一次的两倍,直到与MaxReadBufferSize相同位置。
(mina默认的情况下,最大可以读取缓存大小为65536,最小为64,正常为2048)
请看org.apache.mina.core.session.AbstractIoSessionConfig
private int minReadBufferSize = 64; private int readBufferSize = 2048; private int maxReadBufferSize = 65536; private int idleTimeForRead; private int idleTimeForWrite; private int idleTimeForBoth; private int writeTimeout = 60; private boolean useReadOperation; private int throughputCalculationInterval = 3;
对于读缓冲区增加
摘自类org.apache.mina.core.polling.AbstractPollingIoProcessor<T>
的read方法,这里是mina读取字节流的底层,(再往深一层就是rt.jar包提供sun.nio.ch.SocketChannelImpl类中的read(ByteBuffer paramByteBuffer)方法
这里就是mina通信最底层的原型,有兴趣的读者可以自己研读其代码,这里不冗述.)
下面是我对该代码的一些理解,用注释标明
private void read(T session) { IoSessionConfig config = session.getConfig(); IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize()); final boolean hasFragmentation = session.getTransportMetadata() .hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) { //读取SocketChannelImp中的数据,然后装进buf中,SocketChannelImp中的数据多少由发送端决定,可以累积. while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); //把读取的数据扔到过滤链中(另外一条线程,异步执行).最后反映到IoHandler中 filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { //第一次以ReadBufferSize去读取,每一次增加读取量为前一次的两倍,直到等于MaxReadBufferSize为止 session.increaseReadBufferSize(); } } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { if (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class .isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config) .isCloseOnPortUnreachable()) scheduleRemove(session); } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }
以上注释,说明为什么会每次读取量是前一次的两倍的原因。
下一篇文章还是以本代码解析一下发送端的write的buffer大小的奥秘