Java NIO——Selector机制解析3(源码分析)
最近一直在看java nio,对其中的selector比较感兴趣,所有就先在网上查了些资料,发现还真有很多人研究过这个,其中尤以皓哥写的比较有意思,也很使我受启发,我也转了他的博客Java NIO——Selector机制解析《转》,但是我一直不明白pipe是如何唤醒selector的,所以又去看了jdk的源码(openjdk下载),整理了如下:
以Java nio自带demo : OperationServer.java OperationClient.java(见附件)
其中server端的核心代码:
- public void initSelector() {
- try {
- selector = SelectorProvider.provider().openSelector();
- this.serverChannel1 = ServerSocketChannel.open();
- serverChannel1.configureBlocking(false);
- InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);
- serverChannel1.socket().bind(isa);
- serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
public void initSelector() { try { selector = SelectorProvider.provider().openSelector(); this.serverChannel1 = ServerSocketChannel.open(); serverChannel1.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress("localhost", this.port1); serverChannel1.socket().bind(isa); serverChannel1.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
从头开始,
先看看SelectorProvider.provider()做了什么:
- public static SelectorProvider provider() {
- synchronized (lock) {
- if (provider != null)
- return provider;
- return AccessController.doPrivileged(
- new PrivilegedAction<SelectorProvider>() {
- public SelectorProvider run() {
- if (loadProviderFromProperty())
- return provider;
- if (loadProviderAsService())
- return provider;
- provider = sun.nio.ch.DefaultSelectorProvider.create();
- return provider;
- }
- });
- }
- }
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;而if (provider != null)
returnprovider;
保证了整个server程序中只有一个WindowsSelectorProvider对象;
再看看WindowsSelectorProvider. openSelector():
- public AbstractSelector openSelector() throws IOException {
- return new WindowsSelectorImpl(this);
- }
- new WindowsSelectorImpl(SelectorProvider)代码:
- WindowsSelectorImpl(SelectorProvider sp) throws IOException {
- super(sp);
- pollWrapper = new PollArrayWrapper(INIT_CAP);
- wakeupPipe = Pipe.open();
- wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
- // Disable the Nagle algorithm so that the wakeup is more immediate
- SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
- (sink.sc).socket().setTcpNoDelay(true);
- wakeupSinkFd = ((SelChImpl)sink).getFDVal();
- pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
- }
public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); } new WindowsSelectorImpl(SelectorProvider)代码: WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); }
其中Pipe.open()是关键,这个方法的调用过程是:
- public static Pipe open() throws IOException {
- return SelectorProvider.provider().openPipe();
- }
- SelectorProvider 中:
- public Pipe openPipe() throws IOException {
- return new PipeImpl(this);
- }
public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); } SelectorProvider 中: public Pipe openPipe() throws IOException { return new PipeImpl(this); }
再看看怎么new PipeImpl()的:
- PipeImpl(SelectorProvider sp) {
- long pipeFds = IOUtil.makePipe(true);
- int readFd = (int) (pipeFds >>> 32);
- int writeFd = (int) pipeFds;
- FileDescriptor sourcefd = new FileDescriptor();
- IOUtil.setfdVal(sourcefd, readFd);
- source = new SourceChannelImpl(sp, sourcefd);
- FileDescriptor sinkfd = new FileDescriptor();
- IOUtil.setfdVal(sinkfd, writeFd);
- sink = new SinkChannelImpl(sp, sinkfd);
- }
PipeImpl(SelectorProvider sp) { long pipeFds = IOUtil.makePipe(true); int readFd = (int) (pipeFds >>> 32); int writeFd = (int) pipeFds; FileDescriptor sourcefd = new FileDescriptor(); IOUtil.setfdVal(sourcefd, readFd); source = new SourceChannelImpl(sp, sourcefd); FileDescriptor sinkfd = new FileDescriptor(); IOUtil.setfdVal(sinkfd, writeFd); sink = new SinkChannelImpl(sp, sinkfd); }
其中IOUtil.makePipe(true)是个native方法:
/**
* Returns two file descriptors for a pipe encoded in a long.
* The read end of the pipe is returned in the high 32 bits,
* while the write end is returned in the low 32 bits.
*/
staticnativelong makePipe(boolean blocking);
具体实现:
- JNIEXPORT jlong JNICALL
- Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
- {
- int fd[2];
- if (pipe(fd) < 0) {
- JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
- return 0;
- }
- if (blocking == JNI_FALSE) {
- if ((configureBlocking(fd[0], JNI_FALSE) < 0)
- || (configureBlocking(fd[1], JNI_FALSE) < 0)) {
- JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
- close(fd[0]);
- close(fd[1]);
- return 0;
- }
- }
- return ((jlong) fd[0] << 32) | (jlong) fd[1];
- }
- static int
- configureBlocking(int fd, jboolean blocking)
- {
- int flags = fcntl(fd, F_GETFL);
- int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
- return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
- }
JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) { int fd[2]; if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return 0; } if (blocking == JNI_FALSE) { if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); close(fd[0]); close(fd[1]); return 0; } } return ((jlong) fd[0] << 32) | (jlong) fd[1]; } static int configureBlocking(int fd, jboolean blocking) { int flags = fcntl(fd, F_GETFL); int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); }
正如这段注释:
/**
* Returns two file descriptors for a pipe encoded in a long.
* The read end of the pipe is returned in the high 32 bits,
* while the write end is returned in the low 32 bits.
*/
High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位处理。
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
这行代码把返回的pipe的write端的FD放在了pollWrapper中(后面会发现,这么做是为了实现selector的wakeup())
ServerSocketChannel.open()的实现:
- public static ServerSocketChannel open() throws IOException {
- return SelectorProvider.provider().openServerSocketChannel();
- }
- SelectorProvider:
- public ServerSocketChannel openServerSocketChannel() throws IOException {
- return new ServerSocketChannelImpl(this);
- }
public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); } SelectorProvider: public ServerSocketChannel openServerSocketChannel() throws IOException { return new ServerSocketChannelImpl(this); }
可见创建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。
- ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
- super(sp);
- this.fd = Net.serverSocket(true); //打开一个socket,返回FD
- this.fdVal = IOUtil.fdVal(fd);
- this.state = ST_INUSE;
- }
ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); //打开一个socket,返回FD this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; }
然后通过serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel绑定在一起,也就是把new ServerSocketChannel时创建的FD与selector绑定在了一起。
到此,server端已启动完成了,主要创建了以下对象:
WindowsSelectorProvider:单例
WindowsSelectorImpl中包含:
pollWrapper:保存selector上注册的FD,包括pipe的write端FD和ServerSocketChannel所用的FD
wakeupPipe:通道(其实就是两个FD,一个read,一个write)
再到Server 中的run():
selector.select();主要调用了WindowsSelectorImpl中的这个方法:
- protected int doSelect(long timeout) throws IOException {
- if (channelArray == null)
- throw new ClosedSelectorException();
- this.timeout = timeout; // set selector timeout
- processDeregisterQueue();
- if (interruptTriggered) {
- resetWakeupSocket();
- return 0;
- }
- // Calculate number of helper threads needed for poll. If necessary
- // threads are created here and start waiting on startLock
- adjustThreadsCount();
- finishLock.reset(); // reset finishLock
- // Wakeup helper threads, waiting on startLock, so they start polling.
- // Redundant threads will exit here after wakeup.
- startLock.startThreads();
- // do polling in the main thread. Main thread is responsible for
- // first MAX_SELECTABLE_FDS entries in pollArray.
- try {
- begin();
- try {
- subSelector.poll();
- } catch (IOException e) {
- finishLock.setException(e); // Save this exception
- }
- // Main thread is out of poll(). Wakeup others and wait for them
- if (threads.size() > 0)
- finishLock.waitForHelperThreads();
- } finally {
- end();
- }
- // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
- finishLock.checkForException();
- processDeregisterQueue();
- int updated = updateSelectedKeys();
- // Done with poll(). Set wakeupSocket to nonsignaled for the next run.
- resetWakeupSocket();
- return updated;
- }
protected int doSelect(long timeout) throws IOException { if (channelArray == null) throw new ClosedSelectorException(); this.timeout = timeout; // set selector timeout processDeregisterQueue(); if (interruptTriggered) { resetWakeupSocket(); return 0; } // Calculate number of helper threads needed for poll. If necessary // threads are created here and start waiting on startLock adjustThreadsCount(); finishLock.reset(); // reset finishLock // Wakeup helper threads, waiting on startLock, so they start polling. // Redundant threads will exit here after wakeup. startLock.startThreads(); // do polling in the main thread. Main thread is responsible for // first MAX_SELECTABLE_FDS entries in pollArray. try { begin(); try { subSelector.poll(); } catch (IOException e) { finishLock.setException(e); // Save this exception } // Main thread is out of poll(). Wakeup others and wait for them if (threads.size() > 0) finishLock.waitForHelperThreads(); } finally { end(); } // Done with poll(). Set wakeupSocket to nonsignaled for the next run. finishLock.checkForException(); processDeregisterQueue(); int updated = updateSelectedKeys(); // Done with poll(). Set wakeupSocket to nonsignaled for the next run. resetWakeupSocket(); return updated; }
其中subSelector.poll()是核心,也就是轮训pollWrapper中保存的FD;具体实现是调用native方法poll0:
- private int poll() throws IOException{ // poll for the main thread
- return poll0(pollWrapper.pollArrayAddress,
- Math.min(totalChannels, MAX_SELECTABLE_FDS),
- readFds, writeFds, exceptFds, timeout);
- }
- private native int poll0(long pollAddress, int numfds,
- int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
- // These arrays will hold result of native select().
- // The first element of each array is the number of selected sockets.
- // Other elements are file descriptors of selected sockets.
- private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD
- private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD
- private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD
private int poll() throws IOException{ // poll for the main thread return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELECTABLE_FDS), readFds, writeFds, exceptFds, timeout); } private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout); // These arrays will hold result of native select(). // The first element of each array is the number of selected sockets. // Other elements are file descriptors of selected sockets. private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存发生read的FD private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生write的FD private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存发生except的FD
这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。比如,由于pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD发一份数据,也会造成poll0()返回;如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,所有在OperationServer的run()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();
这时再来看看WindowsSelectorImpl. Wakeup():
- public Selector wakeup() {
- synchronized (interruptLock) {
- if (!interruptTriggered) {
- setWakeupSocket();
- interruptTriggered = true;
- }
- }
- return this;
- }
- // Sets Windows wakeup socket to a signaled state.
- private void setWakeupSocket() {
- setWakeupSocket0(wakeupSinkFd);
- }
- private native void setWakeupSocket0(int wakeupSinkFd);
- JNIEXPORT void JNICALL
- Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
- jint scoutFd)
- {
- /* Write one byte into the pipe */
- const char byte = 1;
- send(scoutFd, &byte, 1, 0);
- }
public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { setWakeupSocket(); interruptTriggered = true; } } return this; } // Sets Windows wakeup socket to a signaled state. private void setWakeupSocket() { setWakeupSocket0(wakeupSinkFd); } private native void setWakeupSocket0(int wakeupSinkFd); JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this, jint scoutFd) { /* Write one byte into the pipe */ const char byte = 1; send(scoutFd, &byte, 1, 0); }
可见wakeup()是通过pipe的write 端send(scoutFd, &byte, 1, 0),发生一个字节1,来唤醒poll()。所以在需要的时候就可以调用selector.wakeup()来唤醒selector。
源于:http://goon.iteye.com/blog/1775421