Jetty基于NIO的形式处理请求
Jetty基于NIO的方式处理请求
Jetty基于NIO的方式处理请求的类是SelectChannelConnector,该类同样继承AbstractLifeCycle类,SelectChannelConnector初始化的时候会调用AbstractLifeCycle类的start()方法,如下:
public final void start() throws Exception { synchronized (_lock) { try { if (_state == STARTED || _state == STARTING) return; setStarting(); doStart(); Log.debug("started {}",this); setStarted(); } catch (Exception e) { setFailed(e); throw e; } catch (Error e) { setFailed(e); throw e; } } }
doStart()方法在SelectChannelConnector类中.如下:
protected void doStart() throws Exception { _manager.setSelectSets(getAcceptors());//设置接收请求的线程个数,默认1个 _manager.setMaxIdleTime(getMaxIdleTime()); _manager.setLowResourcesConnections(getLowResourcesConnections()); _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); _manager.start();//初始化Selector open();//初始化ServerSocketChannel _manager.register(_acceptChannel); super.doStart(); }
_manager类名为SelectorManager,open()方法如下:
public void open() throws IOException { synchronized(this) { if (_acceptChannel == null) { // Create a new server socket _acceptChannel = ServerSocketChannel.open(); // Bind the server socket to the local host and port _acceptChannel.socket().setReuseAddress(getReuseAddress()); InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); _acceptChannel.socket().bind(addr,getAcceptQueueSize()); // Set to non blocking mode _acceptChannel.configureBlocking(false); } } }
super.doStart()方法如下:
protected void doStart() throws Exception { if (_server==null) throw new IllegalStateException("No server"); // open listener port open();//再一次调用open()方法,确保ServerSocketChannel启动,调用两次就能确保启动? super.doStart(); if (_threadPool==null) _threadPool=_server.getThreadPool(); if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle)) ((LifeCycle)_threadPool).start(); // Start selector thread synchronized(this) { _acceptorThread=new Thread[getAcceptors()]; for (int i=0;i<_acceptorThread.length;i++) { if (!_threadPool.dispatch(new Acceptor(i)))//启动接受请求的线程 { Log.warn("insufficient maxThreads configured for {}",this); break; } } } Log.info("Started {}",this); }
Acceptor线程的run()方法如下:
public void run() { Thread current = Thread.currentThread(); String name; synchronized(AbstractConnector.this)//设置当前线程的名字,是不是太复杂点 { if (_acceptorThread==null) return; _acceptorThread[_acceptor]=current; name =_acceptorThread[_acceptor].getName(); current.setName(name+" - Acceptor"+_acceptor+" "+AbstractConnector.this); } int old_priority=current.getPriority(); try { current.setPriority(old_priority-_acceptorPriorityOffset); while (isRunning() && getConnection()!=null)//connector初始化并且ServerSocketChannel存在 { try { accept(_acceptor);//处理收到的请求 } catch(EofException e) { Log.ignore(e); } catch(IOException e) { Log.ignore(e); } catch(ThreadDeath e) { throw e; } catch(Throwable e) { Log.warn(e); } } } finally { current.setPriority(old_priority); current.setName(name); synchronized(AbstractConnector.this) { if (_acceptorThread!=null) _acceptorThread[_acceptor]=null; } } }
accept(_acceptor)最终会调用SelectorManager.SelectSet.doSelect()方法,该方法比较复杂,简单来说就是每接受一个请求就注册到Selector上,并且用SelectChannelEndPoint类(本身也是一个线程)处理请求,SelectChannelEndPoint类的run()方法如下:
public void run() { try { _connection.handle(); } catch (ClosedChannelException e) { Log.ignore(e); } catch (EofException e) { Log.debug("EOF", e); try{close();} catch(IOException e2){Log.ignore(e2);} } catch (HttpException e) { Log.debug("BAD", e); try{close();} catch(IOException e2){Log.ignore(e2);} } catch (Throwable e) { Log.warn("handle failed", e); try{close();} catch(IOException e2){Log.ignore(e2);} } finally { undispatch(); } }
_connection类的类名为HttpConnection,HttpConnection的handle()方法如下:
public void handle() throws IOException { // Loop while more in buffer boolean more_in_buffer = true; // assume true until proven otherwise int no_progress = 0; while (more_in_buffer) { try { synchronized (this) { if (_handling) throw new IllegalStateException(); // TODO delete this // check _handling = true; } setCurrentConnection(this); long io = 0; Continuation continuation = _request.getContinuation();//得到RetryContinuation if (continuation != null && continuation.isPending()) { Log.debug("resume continuation {}",continuation); if (_request.getMethod() == null) throw new IllegalStateException(); handleRequest();//处理http请求,执行filter,servlet等 } else//解析http请求 { // If we are not ended then parse available if (!_parser.isComplete()) io = _parser.parseAvailable(); // Do we have more generating to do? // Loop here because some writes may take multiple steps and // we need to flush them all before potentially blocking in // the // next loop. while (_generator.isCommitted() && !_generator.isComplete()) { long written = _generator.flush(); io += written; if (written <= 0) break; if (_endp.isBufferingOutput()) _endp.flush(); } // Flush buffers if (_endp.isBufferingOutput()) { _endp.flush(); if (!_endp.isBufferingOutput()) no_progress = 0; } if (io > 0) no_progress = 0; else if (no_progress++ >= 2) return; } } catch (HttpException e) { if (Log.isDebugEnabled()) { Log.debug("uri=" + _uri); Log.debug("fields=" + _requestFields); Log.debug(e); } _generator.sendError(e.getStatus(),e.getReason(),null,true); _parser.reset(true); _endp.close(); throw e; } finally { setCurrentConnection(null); more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput(); synchronized (this) { _handling = false; if (_destroy) { destroy(); return; } } if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput()) { if (!_generator.isPersistent()) { _parser.reset(true); more_in_buffer = false; } if (more_in_buffer) { reset(false); more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput(); } else reset(true); no_progress = 0; } Continuation continuation = _request.getContinuation(); if (continuation != null && continuation.isPending()) { break; }else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof SelectChannelEndPoint) // TODO ((SelectChannelEndPoint)_endp).setWritable(false); } } }