NIO Reactor方式(阅读NIO笔记)
NIO Reactor模式(阅读NIO笔记)

使用多个Reactors

注:内容均节选自附件中的ppt文档。
1.网络服务一般的结构:
读取请求--->解码请求--->处理服务--->编码响应--->发送响应
经典的服务设计是“每一个请求一个线程”,如下图
2.Reactor模式
Reactor响应I/O事件,分发到合适的Handler处理。
Handler执行非阻塞的动作。
基本的Reactor设计,单线程版本
示例代码:
package com.zhang.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for(SelectionKey selectionKey : selectionKeys){ dispatch(selectionKey); } selectionKeys.clear(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) selectionKey.attachment(); if(run != null){ run.run(); } } class Acceptor implements Runnable{ @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if(channel != null){ new Handler(selector,channel); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class Handler implements Runnable { private final static int DEFAULT_SIZE = 8092; private final SocketChannel socketChannel; private final SelectionKey seletionKey; private static final int READING = 0; private static final int SENDING = 1; private int state = READING; ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE); public Handler(Selector selector, SocketChannel channel) throws IOException { this.socketChannel = channel; socketChannel.configureBlocking(false); this.seletionKey = socketChannel.register(selector, 0); seletionKey.attach(this); seletionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if(state == READING){ read(); }else if(state == SENDING){ write(); } } class Sender implements Runnable { @Override public void run() { try { socketChannel.write(outputBuffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(outIsComplete()){ seletionKey.cancel(); } } } private void write() { try { socketChannel.write(outputBuffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } while(outIsComplete()){ seletionKey.cancel(); } } private void read() { try { socketChannel.read(inputBuffer); if(inputIsComplete()){ process(); seletionKey.attach(new Sender()); seletionKey.interestOps(SelectionKey.OP_WRITE); seletionKey.selector().wakeup(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public boolean inputIsComplete(){ return false; } public boolean outIsComplete(){ return false; } public void process(){ } }
NIO支持的特性:
- Channels,连接到文件、socket等等,支持非阻塞读。
- Buffers,与数组相似的对象,能直接从Channels中读写。
- Selectors,辨识哪一个通道的集合有IO事件。(Tell which of a set of Channels have IO events)
- SelectionKeys,维护IO事件的状态和绑定。
多线程设计
- 添加线程增加可扩展性,主要应用在多核处理器中
- Worker 线程,Reactors要快速出发handlers。handlers的处理降低了Reactor的速度。将非I/O操作分离到其他线程中处理。
- Multiple Reactor Threads,多Reactor线程。Reactor线程可以使IO操作饱和,分布负载到其他的reactors,负载均衡来匹配CPU和IO之间的速度差异。
使用多个Reactors