Jetty 1. 描述 2. AbstractConnector 3. ServerConnector源码解读 4. SelectorManager类 5. EndPoint和Connection

基于Jetty-9.4.8.v20171121。

Connector接受远程机器的连接和数据,允许应用向远程机器发送数据。

1.2 类图

从类图看出AbstractConnector继承ContainerLifeCycle,所以具有Container和LifeCycle特性。

此外有一个ServerConnector,这个是整个Jetty中很重要的连接器,目前该连接器负责HTTP和HTTPS协议等连接。

ConnectionFactory负责为连接器创建连接对象,不同的连接(HTTP)创建不同的连接对象。

Jetty
1. 描述
2. AbstractConnector
3. ServerConnector源码解读
4. SelectorManager类
5. EndPoint和Connection

1.3 API能力

主要都是一些getter方法,获取该连接器相关的信息。

@ManagedObject("Connector Interface")
public interface Connector extends LifeCycle, Container, Graceful
{
    // 与这个连接器关联的服务器
    public Server getServer();

    // 返回执行任务的执行器
    public Executor getExecutor();

    // 返回调度任务的调度器
    public Scheduler getScheduler();

    // 数据缓冲区
    public ByteBufferPool getByteBufferPool();

    // 返回与协议名称对应的ConnectionFactory对象
    public ConnectionFactory getConnectionFactory(String nextProtocol);
    

    public <T> T getConnectionFactory(Class<T> factoryType);
    
    // 返回默认ConnectionFactory对象
    public ConnectionFactory getDefaultConnectionFactory();
    // 返回所有Connection工厂
    public Collection<ConnectionFactory> getConnectionFactories();
    
    public List<String> getProtocols();
    
    // 返回最大空闲连接时间
    @ManagedAttribute("maximum time a connection can be idle before being closed (in ms)")
    public long getIdleTimeout();

    // 返回这个对象底层的socket,channel,buffer等
    public Object getTransport();
    
    /**
     * @return immutable collection of connected endpoints
     */
    // 返回连接端的不可变集合
    public Collection<EndPoint> getConnectedEndPoints();

    public String getName();
}

2. AbstractConnector

2.1 描述

AbstractConnector利用ConnectionFactory工厂机制为不同协议(HTTP,SSL等)创建Connection实例。  

AbstractConnector管理着连接器必须的几个基本服务:

(1)Executor:Executor服务用于运行该连接器所需的所有活动任务,(例如接受连接,处理HTTP请求),默认使用Server.getThreadPool作为Executor;

(2)Scheduler:调度器服务用于监视所有连接的空闲超时,并且也可用于监控连接时间,例如异步请求超时,默认使用ScheduledExecutorScheduler实例;

(3)ByteBufferPool:ByteBufferPool服务提供给所有连接,用于从池中获取和释放ByteBuffer实例。

这些服务作为bean被Container管理,可以是托管或未托管。

连接器有一个ConnectionFactory集合,每个ConnectionFactory有对应的协议名称。协议名称可以是现实的协议比如https/1.1或http2,甚至可以是私有协议名称。

比如SSL-http/1.1标示SslConnectionFactory,它是由HttpConnectionFactory实例化并且作为HttpConnectionFactory下一个协议。

ConnectionFactory集合可以通过构造函数注入,通过addConnectionFactory,removeConnectionFactory和setConnectionFactories修改。每个协议名称只能对应一个ConnectionFactory实例,如果两个ConnectionFactory对应一个协议名称,那么第二个将替换第一个。

最新ConnectionFactory通过setDefaultProtocol方法设置,或第一次配置的协议工厂。

每个ConnectionFactory类型负责它所接受的协议配置。为了配置HTTP协议,你需要传递HttpConfiguration实例到HttpConnectionFactory(或者其他支持HTTP的ConnectionFactory);相似地,SslConnectionFactory需要SslContextFactory对象和下一个协议名称。

(1)ConnectionFactory可以简单创建Connection对象去支持特定协议。比如HttpConnectionFactory能创建HttpConnection处理http/1.1,http/1.0和http/0.9;

(2)ConnectionFactory也可以通过其他ConnectionFactory创建一系列Connection对象。比如SslConnectionFactory配置了下一个协议名称,一旦接受了请求创建了SslConnection对象。然后可以通过连接器的getConnectionFactory获取下一个ConnectionFactory,这个ConnectionFactory产生的Connection可以处理从SslConnection获取的未加密的数据;

(3)ConnectionFactory也可以创建一个临时Connection,用于在连接上交换数据,以确定下一个使用的协议。例如,ALPN(Application Layer Protocol Negotiation)协议是SSL的扩展,允许在SSL握手期间指定协议,ALPN用于HTTP2在客户端与服务器之间通信协商协议。接受一个HTTP2连接,连接器会配置SSL-ALPN, h2,http/1.1。一个新接受的连接使用“SSL-ALPN”,它指定一个带有“ALPN”的SSLConnectionFactory作为下一个协议。因此,一个SSL连接实例被链接到一个ALPN连接实例。ALPN然后与客户端协商下一个协议,可能是http2,或则http/1.1。一旦决定了下一个协议,ALPN连接调用getConnectionFactory创建连接实例,然后替换ALPN连接。

Connector在运行中会重复调用accept(int)方法,acceptor任务运行在一个循环中。

accept方法实现必须满足如下几点:

(1)阻塞等待新连接;

(2)接受连接(比如socket accept);

(3)配置连接;

(4)调用getDefaultConnectionFactory->ConnectionFactory.newConnection去创建一个新Connection。

acceptor默认的数量是1,数量可以是CPU的个数除以8。更多的acceptor可以减少服务器延迟而且可以获得一个高速的连接(比如http/1.0没有keepalive)。对于现代持久连接协议(http/1.1,http/2)默认值是足够的。

2.2 类图

Jetty
1. 描述
2. AbstractConnector
3. ServerConnector源码解读
4. SelectorManager类
5. EndPoint和Connection

(1)实现Connector接口,实现基本的连接器能力;

(2)继承ContainerLifeCycle类,具备容器化和生命周期能力;

(3)AbstractConnector有一个内部类Acceptor;

(4)ConnectionFactory工厂接口,实现类有HttpConnectionFactory,SslConnectionFactory,针对不同的连接类型如HTTP,HTTPS产生不同的工厂创建不同的产品(Connection),这是典型的工厂方法模式;

2.3 AbstractConnector源码解读

AbstractConnector实现Connector接口,而Connector接口提供的都是getter方法,都比较简单,一般都是直接返回某个对象;

AbstractConnector继承ContainerLifeCycle类,所以具有LifeCycle特性,有启动停止动作;

2.3.1 字段描述

    // 锁对象,设置Accepting,获取ConnectionFactory等操作
    // Locker类封装了ReentrantLock类
    private final Locker _locker = new Locker();
    private final Condition _setAccepting = _locker.newCondition();
    // ConnectionFactory缓存,key为协议名称,value为ConnectionFactory实例
    private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); 
    // 与连接器对应的server
    private final Server _server;
    // 下面executor,scheduler,byteBufferPool是Connector必须的组件,Connector接口的几个getter方法就是返回这些对象。
    private final Executor _executor;
    private final Scheduler _scheduler;
    private final ByteBufferPool _byteBufferPool;
    // 接受连接的线程数组
    private final Thread[] _acceptors;
    // 已经建立的对端连接对象
    private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
    // Connector停止时,需要把Thread[] _acceptors停止
    // _stopping = new CountDownLatch(_acceptors.length)
    private CountDownLatch _stopping;
// 默认空闲连接时间 private long _idleTimeout = 30000; // 默认协议名称,对应ConnectionFactory private String _defaultProtocol; // 默认ConnectionFactory工厂 private ConnectionFactory _defaultConnectionFactory; private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; private ThreadPoolBudget.Lease _lease;

2.3.2 构造函数和addConnectionFactory

 从构造函数来看,已经初始化了如下字段:

(1)_factories

(2)_defaultProtocol

(3)_defaultConnectionFactory

(4)_server

(5)_executor

(6)_scheduler

(7)_byteBufferPool

(8)_acceptors 

// 唯一的构造函数 
public AbstractConnector(
            Server server,
            Executor executor,
            Scheduler scheduler,
            ByteBufferPool pool,
            int acceptors,
            ConnectionFactory... factories)
    {
        _server=server;
        // 如果Executor为null,则获取Server的
        _executor=executor!=null?executor:_server.getThreadPool();
        if (scheduler==null)
            scheduler=_server.getBean(Scheduler.class);
        _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
        if (pool==null)
            pool=_server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();

        addBean(_server,false);
        addBean(_executor);
        if (executor==null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // 缓存ConnectionFactory,如果没有设置,则为HttpConnectionFactory
        for (ConnectionFactory factory:factories) 
            addConnectionFactory(factory);

        int cores = Runtime.getRuntime().availableProcessors();
        if (acceptors < 0)
            acceptors=Math.max(1, Math.min(4,cores/8));
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
       // 设置acceptor进程数组
        _acceptors = new Thread[acceptors];
    }

调用addConnectionFactory方法可以缓存所有的ConnectionFactory:

 public void addConnectionFactory(ConnectionFactory factory)
    {
        if (isRunning())
            throw new IllegalStateException(getState());

        // 需要移除的ConnectionFactory
        Set<ConnectionFactory> to_remove = new HashSet<>();
        for (String key:factory.getProtocols())
        {
            key=StringUtil.asciiToLowerCase(key);
            ConnectionFactory old=_factories.remove(key); // 先移除协议名称对应的ConnectionFactory对象
            if (old!=null)
            {
                if (old.getProtocol().equals(_defaultProtocol))
                    _defaultProtocol=null;
                to_remove.add(old); // 保存待移除
            }
            _factories.put(key, factory); // 增加新的ConnectionFactory
        }

         // keep factories still referenced
         // 避免一种场景:如果_factories里面已经缓存了HttpConnectionFactory,对应的协议名称为http/1.1
         // 然后增加的factory也是HttpConnectionFactory对应的协议名称为http/1.1经过上面的操作,to_remove里面有HttpConnectionFactory这样下面removeBean的时候会误删
        for (ConnectionFactory f : _factories.values())
            to_remove.remove(f);

        // remove old factories
        for (ConnectionFactory old: to_remove)
        {
            removeBean(old);
            if (LOG.isDebugEnabled())
                LOG.debug("{} removed {}", this, old);
        }

        // add new Bean
        addBean(factory);
        if (_defaultProtocol==null)
            _defaultProtocol=factory.getProtocol();
        if (LOG.isDebugEnabled())
            LOG.debug("{} added {}", this, factory);
    }

   

2.3.3 doStart

在Jetty中,服务器对象Server在启动的时候会启动该服务器管理的所有Connector,从所有Connector的继承关系中可以看出AbstractConnector对象是所有具体Connector对象的父类。 

        // start connectors last
        for (Connector connector : _connectors)
        {
            try
            {  
                connector.start(); // 启动Connector对象
            }
            catch(Throwable e)
            {
                mex.add(e);
            }
        }  

下面具体看一下AbstractConnector.doStart的具体实现。

 @Override
    protected void doStart() throws Exception
    {   // 一个Connector至少有一个ConnectionFactory对象
        if(_defaultProtocol==null)
            throw new IllegalStateException("No default protocol for "+this);
        _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
        if(_defaultConnectionFactory==null)
            throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this);
        // 如果是SslConnectionFactory,则必须要有下一个协议ConnectionFactory
        SslConnectionFactory ssl = getConnectionFactory(SslConnectionFactory.class);
        if (ssl != null)
        {
            String next = ssl.getNextProtocol();
            ConnectionFactory cf = getConnectionFactory(next);
            if (cf == null)
                throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
        }

        _lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_acceptors.length);
        super.doStart();
        // 设置所有acceptor线程停止的同步器
        _stopping=new CountDownLatch(_acceptors.length);
        for (int i = 0; i < _acceptors.length; i++)
        {
            Acceptor a = new Acceptor(i);
            addBean(a);
            getExecutor().execute(a); // 启动接受器
        }

        LOG.info("Started {}", this);
    }  

2.3.4 Acceptor

Acceptor接收器负责接受连接且本身是一个线程。

@Override
        public void run()
        {  // (1)设置线程名称与优先级
            final Thread thread = Thread.currentThread();
            String name=thread.getName();
            _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
            thread.setName(_name);

            int priority=thread.getPriority();
            if (_acceptorPriorityDelta!=0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));

            _acceptors[_id] = thread;

            try
            {
                while (isRunning()) // (2)循环接受请求
                {
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await(); // 如果_accepting==false则阻塞,等待通知
                            continue;
                        }
                    }
                    catch (InterruptedException e) 
                    {
                        continue;
                    }
                    
                    try
                    {
                        accept(_id); // (3)接受请求,这个是核心方法
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
                thread.setName(name);
                if (_acceptorPriorityDelta!=0)
                    thread.setPriority(priority);

                synchronized (AbstractConnector.this) // Why?
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping=_stopping; 
                if (stopping!=null)
                    stopping.countDown(); // 如果线程异常,则设置线程同步器减一
            }
        }  

在Acceptor的run方法里面有个accept(int acceptorID)是AbstractConnector新增的抽象方法,负责处理连接请求。

到此AbstractConnector类的主要方法基本已经分析完毕,下面紧接着分析accept方法的实现,重点关注ServerConnector类。

3. ServerConnector源码解读

ServerConnector主要用于TCP/IP连接,使用不同的ConnectionFactory实例,它可以直接或通过SSL接受HTTP、HTTP/2和WebSocket的连接。

ServerConnector是一个基于NIO的完全异步实现。连接器必需的服务(Executor,Scheduler等)默认使用传入的Server实例;也可以通过构造函数注入;

各种重载的构造函数用于ConnectionFactory的配置。如果没有设置ConnectionFactory,构造函数将默认使用HttpConnectionFactory。SslContextFactory可以实例化SslConnectionFactory。

连接器会使用Executor执行许多Selector任务,这些任务使用NIO的Selector异步调度accepted连接。selector线程将调用通过EndPoint.fillInterested(Callback)或EndPoint.write(Callback,ByteBuffer)传入的Callback的方法。

这些回调可以执行一些非阻塞的IO工作,但总是会向Executor服务发送任何阻塞、长时间运行或应用程序任务。Selector默认的数量是JVM可用核数的一半。

3.1 字段描述

3.2 承接上面2.3.4小结介绍ServerConnector.accept(int accpetorID)

     // 参数acceptorID其实没有使用 
    @Override
    public void accept(int acceptorID) throws IOException
    {   // 获取与该连接器对应的ServerSocketChannel,该类是java.nio
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept(); // java api
            accepted(channel); // 接受具体的SocketChannel
        }
    }  

在调用accepted方法之后具体的处理将交给SelectorManager类,ServerConnector使用的是ServerConnectorManager类。

4. SelectorManager类

4.1 类图

Jetty
1. 描述
2. AbstractConnector
3. ServerConnector源码解读
4. SelectorManager类
5. EndPoint和Connection

(1)SelectorManager通过管理ManagedSelector对象简化JVM原始提供的java.nio非阻塞操作;SelectorManager子类实现方法返回协议指定的EndPoint和Connection对象。

(2) ManagedSelector包装Selector简化在channel上面的非阻塞操作;ManagedSelector运行在select循环中并且阻塞在Selector.select()方法直到注册channel事件发生,当有事件发生,它负责通知与当前channel相关的EndPoint。

4.2 accept方法

    public void accept(SelectableChannel channel)
    {
        accept(channel, null);
    }

     // 注册Channel执行非阻塞读写操作
    public void accept(SelectableChannel channel, Object attachment)
    {
        final ManagedSelector selector = chooseSelector(channel); // 选择ManagedSelector对象
        // Accept是一个Runnable子类,submit提交线程池运行
        selector.submit(selector.new Accept(channel, attachment));
    }
     
    private ManagedSelector chooseSelector(SelectableChannel channel)
    {  
        return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)];
    }
    

4.3 ManagedSelectopr.Accept类

// 线程可执行类
class Accept extends Invocable.NonBlocking implements Closeable
    {
        private final SelectableChannel channel;
        private final Object attachment;

        Accept(SelectableChannel channel, Object attachment)
        {
            this.channel = channel;
            this.attachment = attachment;
        }

        @Override
        public void close()
        {
            LOG.debug("closed accept of {}", channel);
            closeNoExceptions(channel);
        }

        @Override
        public void run()
        {
            try
            {
                final SelectionKey key = channel.register(_selector, 0, attachment);
                // 创建EndPoint对象,然后提交线程池执行
                submit(new CreateEndPoint(channel, key));
            }
            catch (Throwable x)
            {
                closeNoExceptions(channel);
                LOG.debug(x);
            }
        }
    }

submit方法就是把线程任务提交到Queue<Runnable> _actions = new ArrayDeque<>();队列中。

然后每个ManagedSelector在doStart里面启动线程池不断从_actions队列中获取任务执行,主要涉及的类有EatWhatYouKill,ManagedSelector.SelectorProducer等。

Jetty是基于Handler处理各种请求,下面重点分析如何调用最终的Handler处理器。

5. EndPoint和Connection

5.1 类图

Jetty
1. 描述
2. AbstractConnector
3. ServerConnector源码解读
4. SelectorManager类
5. EndPoint和Connection                  Jetty
1. 描述
2. AbstractConnector
3. ServerConnector源码解读
4. SelectorManager类
5. EndPoint和Connection

紧接着4.3中的CreateEndPoint类,调用createEndPoint方法,

private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
    {
        // endPoint的实际类型是SocketChannelEndPoint对象
        EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
         // 如果未设置ConnectionFactory,则默认是HttpConnectionFactory,连接对象类型是HttpConnection,即connection类型是HttpConnection
        Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
        endPoint.setConnection(connection);
        selectionKey.attach(endPoint);
        // 当EndPoint打开,回调该方法
        endPoint.onOpen();
        // 当EndPoint打开,回调该方法,同时将endPoint保存在AbstractConnector的Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());里面
        _selectorManager.endPointOpened(endPoint);
        // 内部调用HttpConnection.onOpen()
        _selectorManager.connectionOpened(connection);
        if (LOG.isDebugEnabled())
            LOG.debug("Created {}", endPoint);
    }  

下面看看HttpConnection.onOpen()方法实现:

     // HttpConnection
    @Override
    public void onOpen()
    {
        super.onOpen(); // 通知Listeners
        fillInterested();
    }

    // AbstractConnection
    public void fillInterested()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("fillInterested {}",this);
         // SocketChannelEndPoint类型
        getEndPoint().fillInterested(_readCallback);
    }

    // AbstractEndPoint
    @Override
    public void fillInterested(Callback callback)
    {
        notIdle();
        _fillInterest.register(callback); // 最终调用SocketChannelEndPoint父类ChannelEndPoint.needsFillInterest
    }  

最后补充一个业务调用栈:

 Jetty
1. 描述
2. AbstractConnector
3. ServerConnector源码解读
4. SelectorManager类
5. EndPoint和Connection