基于NIO的服务器陷于写状态有效死循环的原因分析
有一段时间没写博客了,最近在给导师做并行通信的一个程序。在编码过程中发现了一个问题,查阅了很多资料,今天终于知道了原因。
问题描述:
编写基于NIO服务器的时候,客户端向服务端发送一条消息之后,服务端的Selector.select()陷入写有效的死循环中。
在分析问题之前先推荐一些博客和书籍,也是我最近正在拜读和学习的,个人感觉非常的好。
林昊先生的《分布式Java应用基础与实践》
Doug Lea的论文《Scalable IO in Java》
http://www.jdon.com/concurrent/nio.pdf
三石.道的博客:http://www.molotang.com/java
并发编程网的几篇博客:http://ifeve.com/?s=Selector
还有就是一本书《Java NIO》:
http://xxing22657-yahoo-com-cn.iteye.com/blog/899279
好了现在开始分析问题,这个问题是我在拜读Doug Lea的论文《Scalable IO in Java》时,探究Reactor模式的时候发现的。先贴出我写的NIO服务器的和客户端通信的核心代码,有两个类Reactor和Handle:
//Reactor 类 package com.wjy.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class Reactor implements Runnable{ final Selector selector; final ServerSocketChannel serverSocket; final int timeOut=6000; @Override public void run() { // TODO Auto-generated method stub try { while(!Thread.interrupted()){ if(selector.select(timeOut)==0) { System.out.println("."); continue; } Set selected=selector.selectedKeys(); Iterator it=selected.iterator(); while(it.hasNext()){ dispatch((SelectionKey)(it.next())); } selected.clear(); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } void dispatch(SelectionKey k){ Runnable runnable=(Runnable)(k.attachment()); if(runnable!=null){ runnable.run(); } } Reactor(int port) throws IOException{ // TODO Auto-generated constructor stub selector=Selector.open(); serverSocket=ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk=serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } class Acceptor implements Runnable{ public void run(){ try { SocketChannel c=serverSocket.accept(); if(c!=null){ new Handler(selector,c); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } } }
//Handler package com.wjy.server; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; public final class Handler implements Runnable{ final int MAXIN=1024; final int MAXOUT=1024; final SocketChannel socket; final String localCharSetName="gb2312"; final SelectionKey sk; ByteBuffer input=ByteBuffer.allocate(MAXIN); ByteBuffer output=ByteBuffer.allocate(MAXOUT); static final int READING=0,SENDING=1; int state=READING; Handler(Selector sel,SocketChannel c) throws IOException{ socket=c; socket.configureBlocking(false); //这一步很重要 sk=socket.register(sel, 0); sk.attach(this); sk.interestOps(sk.interestOps() | SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete(){ return true; } boolean outputIsComplete(){ return true; } void process(){ input.flip(); try { String receivedString=Charset.forName(localCharSetName).newDecoder().decode(input).toString(); System.out.println("Received: "+receivedString); } catch (CharacterCodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { try { // if (state == READING) read(); // else if (state == SENDING) send(); if(sk.isReadable()) { read(); }else if(sk.isWritable()) { send(); } } catch (IOException ex) { ex.printStackTrace(); } } void read() throws IOException { input.clear(); socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now //关键在这一句,有了它selector.select总能选到write请求,会陷入死循环。 sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } void send() throws IOException { String sendString="Hello,Client. I have received your message: "; output.clear(); output=ByteBuffer.wrap(sendString.getBytes(localCharSetName)); socket.write(output); state=READING; sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE)); //这句太重要了,否则进入write的死循环。 // if (outputIsComplete()) // sk.cancel(); } }
我直接将正确的代码贴出来了,原因是这样的:
当用户连接(connect)服务器的时候,激发了Accept。
当用户向服务器写东西的时候,Selecter.select会发现有有效的Readable的key。
我们读取完数据后,注册了SelectionKey.OP_WRITE。
接下来问题出现了,通过断点调试发现Selecter.select()总能返回非0值(其实是1),而且选到的key是isWriteable的。一直死循环下去。
原因:
不应该注册写事件。写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的,所以当注册写事件后,写操作一直是就绪的,选择处理线程会占用整个CPU资源。所以,只有当确实有数据要写时再注册写操作,并在写完以后马上取消注册。
解决办法:
服务端读到东西后,注册写事件。等写完东西后取消写事件的注册。
就像这样:sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE));