JAVA NIO跟MINA发送数据过程解析
NIO发送数据过程:
1 将信道写入操作加锁保证其他线程不对信道写入(文档中称“是如果另一个线程已经在此通道上发起了一个写入操作,则在该操作完成前此方法的调用被阻塞。”)
2 如果缓冲区为非直接缓冲区,则复制缓冲区内容到直接缓冲区,防止外界对缓冲区内容修改导致发送数据损坏
* 复制过程分配的内存将被捆绑在线程上,在线程关闭之前,这部分内存不被回收,等IO操作完成后,可重用这部分内存,减少内存分配过程,其性能消耗更多在复制内容上。
* 直接缓冲区与非直接缓冲区参见ByteBuffer类文档
3 通过JNI调用本地方法直接将缓冲区地址和长度传递给操作系统,由系统进行异步IO操作
由此可见jvm与操作系统共用直接缓冲区。
MINA发送数据过程:
MINA发送数据过程在
org.apache.mina.core.polling.AbstractPollingIoProcessor.flushNow(T, long)
org.apache.mina.core.polling.AbstractPollingIoProcessor.writeBuffer(T, WriteRequest, boolean, int, long)
org.apache.mina.transport.socket.nio.NioProcessor.write(NioSession, IoBuffer, int)
方法中。
flashNow方法负责清空IoSession.writeRequestQueue列队。为保证传输效率,flashNow方法尽量将输出长度匹配到接收缓冲区长度的1.5倍。
flashNow方法循环调用writeBuffer,直到所调用writeBuffer方法返回值总和大于等于接收缓冲区容量的1.5倍、writeBuffer返回0或全部消息发送成功。
writeBuffer方法负责发送一条消息数据,write方法返回0,writeBuffer将连续尝试256次。
如果消息完整发送,将调用IoFilterChain.fireMessageSent(req)方法。如果有数据发送,将返回实际发送数据长度,否则返回0。
* 如果连接支持碎片拼接(如TCP连接),则writeBuffer方法可以决定输出消息的一部分,保证输出数据长度不超过输入缓冲区的1.5倍。
* 如果连接不支持碎片拼接(如UDP连接),则writterBuffer方法必须将消息完整输出。
write方法调用信道的write方法,将IoBuffer包装的ByteBuffer放入操作系统缓冲区,并返回结果。
由此可见,在基本NIO中,如果系统底层缓冲区填满,SocketChannel.write方法将返回0,业务逻辑必须处理此情况,否则将丢失发送数据。
在MINA中,使用消息分批或拆分方式将最大效率利用系统缓冲区,并保证由于系统缓冲区装满而未发送的消息将重新发送。
相关代码:
sun.nio.ch.SocketChannelImpl.write(ByteBuffer)关键代码:
.
.
.
for (;;) {
n = IOUtil.write(fd, buf, -1, nd, writeLock);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
}
.
.
.
sun.nio.ch.IOUtil.write(FileDescriptor, ByteBuffer[], int, int, NativeDispatcher) 关键代码:
.
.
.
IOVecWrapper vec = IOVecWrapper.get(length);
.
.
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
hadow.put(buf);
shadow.flip();
vec.setShadow(iov_len, shadow);
.
.
.
long bytesWritten = nd.writev(fd, vec.address, iov_len);
SocketDispatcher.writev:
long writev(FileDescriptor fd, long address, int len) throws IOException {
return writev0(fd, address, len);
}
特殊返回值定义在:
final class IOStatus {
static final int EOF = -1; // End of file
static final int UNAVAILABLE = -2; // Nothing available (non-blocking)
static final int INTERRUPTED = -3; // System call interrupted
static final int UNSUPPORTED = -4; // Operation not supported
static final int THROWN = -5; // Exception thrown in JNI code
static final int UNSUPPORTED_CASE = -6; // This case not supported
static int normalize(int n) {
if (n == UNAVAILABLE)
return 0;
return n;
}
}
windows下JNI调用:
Java_sun_nio_ch_SocketDispatcher_writev0(JNIEnv *env, jclass clazz,
jobject fdo, jlong address, jint len)
{
/* set up */
int i = 0;
DWORD written = 0;
jint fd = fdval(env, fdo);
struct iovec *iovp = (struct iovec *)address;
WSABUF *bufs = malloc(len * sizeof(WSABUF));
if (bufs == 0) {
JNU_ThrowOutOfMemoryError(env, 0);
return IOS_THROWN;
}
if ((isNT() == JNI_FALSE) && (len > 16)) {
len = 16;
}
/* copy iovec into WSABUF */
for(i=0; i<len; i++) {
bufs[i].buf = (char *)iovp[i].iov_base;
bufs[i].len = (u_long)iovp[i].iov_len;
}
/* read into the buffers */
i = WSASend((SOCKET)fd, /* Socket */
bufs, /* pointers to the buffers */
(DWORD)len, /* number of buffers to process */
&written, /* receives number of bytes written */
0, /* no flags */
0, /* no overlapped sockets */
0); /* no completion routine */
/* clean up */
free(bufs);
if (i != 0) {
int theErr = (jint)WSAGetLastError();
if (theErr == WSAEWOULDBLOCK) {
return IOS_UNAVAILABLE;
}
JNU_ThrowIOExceptionWithLastError(env, "Vector write failed");
return IOS_THROWN;
}
return convertLongReturnVal(env, (jlong)written, JNI_FALSE);
}
windows本地接口
int WSASend (
SOCKET s,
LPWSABUF lpBuffers
DWORD dwBufferCount,
LPDWORD lpNumberOfBytesSent,
DWORD dwFlags,
LPWSAOVERLAPPED lpOverlapped,
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
返回值WSAEWOULDBLOCK Windows NT: Overlapped sockets: There are too many outstanding overlapped I/O requests. Nonoverlapped sockets: The socket is marked as nonblocking and the send operation cannot be completed immediately.