zk客户端的ClientCnxn类

ClientCnxn是客户端的类:该类管理zk客户端的socket io,维持一个可用服务器的列表。

//ClientCnxn类
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>(); //待发送
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); //发送后等待响应
final SendThread sendThread;
final EventThread eventThread;

Packet封装了请求、响应以及回调等。

static class Packet {
    //省略其他代码
    RequestHeader requestHeader;
    ReplyHeader replyHeader;
    Record request;
    Record response;
    ByteBuffer bb;
    String clientPath;
    String serverPath;
    boolean finished;
    AsyncCallback cb;
    Object ctx;
    WatchRegistration watchRegistration;
    public boolean readOnly;
    Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
           Record request, Record response,
           WatchRegistration watchRegistration) {
        this(requestHeader, replyHeader, request, response,
             watchRegistration, false);
    }
    Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
           Record request, Record response,
           WatchRegistration watchRegistration, boolean readOnly) {
        this.requestHeader = requestHeader;
        this.replyHeader = replyHeader;
        this.request = request;
        this.response = response;
        this.readOnly = readOnly;
        this.watchRegistration = watchRegistration;
    }
}

ClientCnxn类中有SendThread和EventThread两个线程,SendThread负责io(发送和接收),EventThread负责事件处理。

Packet在outgoingQueue和pendingQueue之间流转:

首先,调用线程把Packet放在outgoingQueue中,SendThread从outgoingQueue中取出Packet并发送,然后把该Packet放入pendingQueue中,等待响应,当响应来到时,解析响应,并从pendingQueue中删除Packet。

示例代码:

public class MyZK {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 60000000, new MyWatcher());
        
        List<ACL> list = new ArrayList<ACL>(1);
        list.add(new ACL(Perms.ALL, Ids.ANYONE_ID_UNSAFE));
        //CreateResponse
        String create = zk.create("/zhang/hello", "hello".getBytes(), list, CreateMode.PERSISTENT);
        System.out.println(create);
        
        //GetDataResponse        
// 添加了watch。删除该节点的时候,服务端会发通知
byte[] data = zk.getData("/zhang/hello", true, null); System.out.println(new String(data)); Stat stat = zk.exists("/zhang/hello", false); System.out.println(stat); zk.delete("/zhang/hello", -1); } static class MyWatcher implements Watcher { public void process(WatchedEvent event) { System.out.println(event); } } }

getData 调用栈:

zk客户端的ClientCnxn类

// ZooKeeper.getData
public byte[] getData(final String path, Watcher watcher, Stat stat)
    throws KeeperException, InterruptedException
 {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        wcb = new DataWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();    //请求头
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest(); //请求
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse(); //响应
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); //发送请求,阻塞,等待响应
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (stat != null) {
        DataTree.copyStat(response.getStat(), stat);
    }
    return response.getData();
}

调用线程把请求放入outgoingQueue队列中,然后阻塞:

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    //创建Packet,并放入outgoingQueue中
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    //先获取锁
    synchronized (packet) {
        while (!packet.finished) {
            //等待,并释放锁
            packet.wait();
        }
    }
    return r;
}

SendThread负责io操作,发送请求,接收响应。

zk客户端的ClientCnxn类

发送:把outgoingQueue中的packet发送出去,然后把packet放到pendingQueue中,等待响应。

接收:解析响应,从pendingQueue中删除packet。

// ClientCnxnSocketNIO.doIO
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
  throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
        //读取响应
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException(
                    "Unable to read additional data from server sessionid 0x"
                            + Long.toHexString(sessionId)
                            + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                recvCount++;
                readLength();
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else { 
                //解析响应
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
    if (sockKey.isWritable()) {
        //发送请求
        synchronized(outgoingQueue) {
            Packet p = findSendablePacket(outgoingQueue,
                    cnxn.sendThread.clientTunneledAuthenticationInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != OpCode.ping) &&
                            (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    p.createBB();
                }
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount++;
                    //从outgoingQueue中删除packet
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                            && p.requestHeader.getType() != OpCode.ping
                            && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            //把packet加入到pendingQueue中
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                disableWrite();
            } else {
                // Just in case
                enableWrite();
            }
        }
    }
}

解析响应:

//SendThread类
void readResponse(ByteBuffer incomingBuffer) throws IOException {
    //省略其他代码
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    replyHdr.deserialize(bbia, "header");

    // -1表示notification    
    if (replyHdr.getXid() == -1) {
        //反序列化,设置WatcherEvent对象
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        // convert from a server path to a client path
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath()
                        + " which is too short for chroot path "
                        + chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event);
        //放入eventThread的队列
        eventThread.queueEvent(we);
        return;
    }
    //正常的响应数据
    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response
     * to the first request!
     */
    try {
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order.");
        }

        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        if (packet.response != null && replyHdr.getErr() == 0) {
            //反序列化设置response内容
            packet.response.deserialize(bbia, "response");
        }
    } finally {
        finishPacket(packet);
    }
}

调用线程现在还阻塞着呢,需要有人唤醒:

private void finishPacket(Packet p) {
    if (p.watchRegistration != null) {
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            //唤醒调用线程
            p.notifyAll();
        }
    } else {
        p.finished = true;
        eventThread.queuePacket(p);
    }
}

问题:

SendThread是按顺序发送outgoingQueue中的Packet,然后放入pendingQueue中,但是接收到的响应未必是按顺序的(网络环境中先发的数据包未必先到达吧),

zk就这么自信收到的响应和pendingQueue中的顺序一致?