模仿jetty的nio原理例子2(7月10号改进)
仿照jetty的nio原理例子2(7月10号改进)
改动点:
1.分成了4个class文件,看起来清晰一点
2.把请求封装成附件,放到socketChannel里面
3.selector.accept()方法删除,取而代之的是selector.selectNow(),并且放到处理注册信息之后。增加了休息策略,selector.select(400),避免不停的循环,占用cpu%的情况。
4.每个请求到来之后,直接分出一个线程去处理。
7月10日改进点:
1.增加了自动删除超时的连接功能
2.key.interestOps操作优化,放到selector线程里面去做
3.request取消了runnable接口
SimpleJettyServerPlus 这个是server
ConnectionHandler 这个是提交连接事件的
Request这个是附件,放再socketChannel里的附件,包含了请求信息
RequestHandlerl用来提交请求信息
改动点:
1.分成了4个class文件,看起来清晰一点
2.把请求封装成附件,放到socketChannel里面
3.selector.accept()方法删除,取而代之的是selector.selectNow(),并且放到处理注册信息之后。增加了休息策略,selector.select(400),避免不停的循环,占用cpu%的情况。
4.每个请求到来之后,直接分出一个线程去处理。
7月10日改进点:
1.增加了自动删除超时的连接功能
2.key.interestOps操作优化,放到selector线程里面去做
3.request取消了runnable接口
SimpleJettyServerPlus 这个是server
package com.daizuan.jetty.plus; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author daizuan */ public class SimpleJettyServerPlus { private final ConcurrentLinkedQueue<Object> _changes_con = new ConcurrentLinkedQueue<Object>(); private ServerSocketChannel channel; private Selector selector; private int port; private Runnable connectionHandler; private Runnable requestHandler; public SimpleJettyServerPlus(int port) throws IOException{ this.port = port; this.channel = ServerSocketChannel.open(); this.selector = Selector.open(); } public void setConnectionHandler(ConnectionHandler connectionHandler) { this.connectionHandler = connectionHandler; } public void setRequestHandler(RequestHandler requestHandler) { this.requestHandler = requestHandler; } public void listen() throws IOException { // 服务器开始监听端口,提供服务 channel.socket().bind(new InetSocketAddress(port)); // 将scoket榜定在制定的端口上 channel.configureBlocking(true); startConnectionHandler(); startRequestHandler(); } private void startRequestHandler() { if (requestHandler == null) { requestHandler = new RequestHandler(_changes_con, selector); } startThread(requestHandler); } private void startConnectionHandler() { if (connectionHandler == null) { connectionHandler = new ConnectionHandler(_changes_con, channel, selector); } startThread(connectionHandler); } private void startThread(Runnable run) { new Thread(run).start(); } public static void main(String[] args) throws IOException { // System.out.println("server start........."); SimpleJettyServerPlus server = new SimpleJettyServerPlus(6789); server.listen(); // 服务器开始监听端口,提供服务 } }
ConnectionHandler 这个是提交连接事件的
package com.daizuan.jetty.plus; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.ConcurrentLinkedQueue; public class ConnectionHandler implements Runnable { private ConcurrentLinkedQueue<Object> _changes_con; private ServerSocketChannel channel; private Selector selector; public ConnectionHandler(ConcurrentLinkedQueue<Object> _changes_con, ServerSocketChannel channel, Selector selector){ this._changes_con = _changes_con; this.channel = channel; this.selector = selector; } @Override public void run() { System.out.println("ConnectionHander:connection Hander start......"); while (true) { // 分发连接事件 SocketChannel sc = null; try { // 这里阻塞监听连接事件 sc = channel.accept(); sc.configureBlocking(false); _changes_con.add(sc); // 释放selector的锁,以便接收注册信息 selector.wakeup(); System.out.println("listener:a client in![" + sc.socket().getRemoteSocketAddress() + "]"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
Request这个是附件,放再socketChannel里的附件,包含了请求信息
package com.daizuan.jetty.plus; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ConcurrentLinkedQueue; public class Request { private final ConcurrentLinkedQueue<Object> _changes_req = new ConcurrentLinkedQueue<Object>(); private SelectionKey key; Selector selector; private static int DEFAULT_BUFFERSIZE = 16; private static String DEFAULT_CHARSET = "GBK"; private static final String FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final String EXIT = "exit"; private static final int MAX_ZERO_COUNT = 16; private static final long MAX_IDLE_TIME = 60000; private String id; private boolean isDispatched = false; private Runnable _handle = new Runnable() { @Override public void run() { handle(); } }; private volatile long dispatchedTime = 0; private SocketChannel sc; private RequestHandler reqHandler; private int interestOps; public void setReqHandler(RequestHandler reqHandler) { this.reqHandler = reqHandler; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Request(Selector selector, SelectionKey key){ this.key = key; this.selector = selector; this.sc = (SocketChannel) key.channel(); dispatchedTime = System.currentTimeMillis(); } public void addTask(Object o) { _changes_req.add(o); } public void process() { synchronized (this) { if (isDispatched) { System.out.println("I am dispatched ,so return.."); key.interestOps(0); return; } interestOps = key.interestOps(); dispatchedTime = System.currentTimeMillis(); isDispatched = true; new Thread(_handle).start(); } } private void handle() { try { // 解析出请求 String request = parseRequest(); System.out.println("read [" + request + "] from " + id); if (request == null || needToCanncel(request)) { System.out.println(id + "I am die!"); close(); return; } // 向客户端写一些信息 write("[" + getTime() + "] " + request + "\n"); unDispatched(); } catch (Exception e) { e.printStackTrace(); } } private void unDispatched() { synchronized (this) { isDispatched = false; updateKey(); } } /** * 重新设置key,并不做实际更新,仅仅设置,把实际的更新操作放到selector线程里面去做 */ private void updateKey() { synchronized (this) { interestOps = !isDispatched ? SelectionKey.OP_READ : 0; System.out.println("interestOps:" + interestOps + ",SelectionKey.OP_READ:" + SelectionKey.OP_READ); if (key.interestOps() == interestOps) { return; } reqHandler.addChange(this); selector.wakeup(); } } /** * 更新key */ public void doUpdateKey() { synchronized (this) { if (key != null && key.isValid() && sc.isOpen()) { key.interestOps(interestOps); System.out.println("interestOps-->" + interestOps); } else { close(); } } } public void timeOut() { long now = System.currentTimeMillis(); if (now - dispatchedTime > MAX_IDLE_TIME) { close(); } } private String getTime() { DateFormat df = new SimpleDateFormat(FORMAT); return df.format(new Date()); } private boolean needToCanncel(String request) { return EXIT.equals(request); } private String parseRequest() throws IOException { ByteBuffer bbuffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE); int count = 0; int off = 0; byte[] data = new byte[DEFAULT_BUFFERSIZE * 10]; bbuffer.clear(); int zeroCount = 0; // 循环一次性吧所有数据读完,否则可能buffer满了,数据未读完 System.out.println(11111111); while ((count = sc.read(bbuffer)) != -1) { if (count == 0 && ++zeroCount > MAX_ZERO_COUNT) { System.out.println("read zero count:" + zeroCount + ",break"); break; } bbuffer.flip(); if ((off + count) > data.length) { data = grow(data, DEFAULT_BUFFERSIZE * 10); } byte[] buf = bbuffer.array(); System.arraycopy(buf, 0, data, off, count); off += count; } if (count < 0) { return null; } byte[] req = new byte[off]; System.arraycopy(data, 0, req, 0, off); return new String(req, DEFAULT_CHARSET).trim(); } private void close() { if (sc != null && sc.socket() != null) { try { if (!sc.socket().isClosed()) { sc.socket().close(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (sc != null) { try { sc.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (key != null) { key.cancel(); } reqHandler.removeReq(this); } private void write(String str) { try { sc.write(ByteBuffer.wrap(str.getBytes(DEFAULT_CHARSET))); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 数组扩容 * * @param src byte[] 源数组数据 * @param size int 扩容的增加量 * @return byte[] 扩容后的数组 */ private byte[] grow(byte[] src, int size) { byte[] tmp = new byte[src.length + size]; System.arraycopy(src, 0, tmp, 0, src.length); return tmp; } }
RequestHandlerl用来提交请求信息
package com.daizuan.jetty.plus; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; public class RequestHandler implements Runnable { private ConcurrentLinkedQueue<Object> _changes_con; private Selector selector; private static final long MAX_IDLE = 400; private long kickTime = 0; private ConcurrentMap<Request, RequestHandler> requests = new ConcurrentHashMap<Request, RequestHandler>(); public RequestHandler(ConcurrentLinkedQueue<Object> _changes_con, Selector selector){ this._changes_con = _changes_con; this.selector = selector; } @Override public void run() { System.out.println("RequestHander:Request Hander start......"); while (true) { try { int changes = _changes_con.size(); Object change = null; while (changes-- > 0 && (change = _changes_con.poll()) != null) { if (change instanceof SocketChannel) { processCon(change); } else if (change instanceof Request) { ((Request) change).doUpdateKey(); } else { System.out.println("what's this??"); } } int count = selector.selectNow(); if (count == 0) selector.select(MAX_IDLE); Set<SelectionKey> keys = selector.selectedKeys(); // 处理请求信息 for (SelectionKey key : keys) { System.out.println("find some keys " + key); processReq(key); } selector.selectedKeys().clear(); long now = System.currentTimeMillis(); if (now - kickTime > MAX_IDLE) { kickTime = now; kick(); } } catch (Exception e) { e.printStackTrace(); } } } private void processCon(Object change) { try { if (change instanceof SocketChannel) { SocketChannel sc = (SocketChannel) change; String id = "[" + sc.socket().getRemoteSocketAddress() + "] "; SelectionKey key = sc.register(selector, SelectionKey.OP_READ, null); Request req = new Request(selector, key); req.setReqHandler(this); req.setId(id); key.attach(req); requests.put(req, this); req.process(); System.out.println("a client connected!" + id); } } catch (Exception e) { e.printStackTrace(); } } /** * 定时的清除一些超时的连接 */ private void kick() { new Thread(new Runnable() { @Override public void run() { for (Map.Entry<Request, RequestHandler> entry : requests.entrySet()) { entry.getKey().timeOut(); } } } ).start(); } public void removeReq(Request req) { System.out.println("remvoe:" + req); requests.remove(req); } public void addChange(Request req) { System.out.println("add:" + req); this._changes_con.add(req); } private void processReq(SelectionKey key) { if (!key.isValid()) { key.cancel(); Request req = (Request) key.attachment(); if (req != null) req.doUpdateKey(); return; } Request req = (Request) key.attachment(); req.process(); } }
1 楼
zhhzhfya
2012-03-19
你好,我用一个IE访问
ConnectionHandler.java
的
// 这里阻塞监听连接事件
sc = channel.accept();
这里进行2次accept,我感觉应该一次吧
会出现下面的日志:
ConnectionHander:connection Hander start......
RequestHander:Request Hander start......
listener:a client in![/127.0.0.1:7882]
listener:a client in![/127.0.0.1:7883]
请帮忙解释下,谢谢
ConnectionHandler.java
的
// 这里阻塞监听连接事件
sc = channel.accept();
这里进行2次accept,我感觉应该一次吧
会出现下面的日志:
ConnectionHander:connection Hander start......
RequestHander:Request Hander start......
listener:a client in![/127.0.0.1:7882]
listener:a client in![/127.0.0.1:7883]
请帮忙解释下,谢谢