zookeeper server处理客户端命令的流程

zk server处理命令涉及到3个类,2个线程:一个命令请求先后经过PrepRequestProcessor,SyncRequestProcessor,FinalRequestProcessor。

PrepRequestProcessor类对应线程ProcessThread,SyncRequestProcessor类对应线程SyncThread。

在命令到达PrepRequestProcessor之前,还有一段路程:

zookeeper server处理客户端命令的流程

//ZooKeeperServer
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    // We have the request, now process and setup for next
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();
    h.deserialize(bia, "header");
    // Through the magic of byte buffers, txn will not be
    // pointing
    // to the start of the txn
    incomingBuffer = incomingBuffer.slice();
    if (h.getType() == OpCode.auth) {
        ...
        return;
    } else {
        if (h.getType() == OpCode.sasl) {
            Record rsp = processSasl(incomingBuffer,cnxn);
            ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
            cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
        }
        else { //默认走这个分支
            Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
              h.getType(), incomingBuffer, cnxn.getAuthInfo());
            si.setOwner(ServerCnxn.me);
            submitRequest(si);
        }
    }
    cnxn.incrOutstandingRequests(h);
}

public void submitRequest(Request si) {
    //省略其他代码
    touch(si.cnxn);
    boolean validpacket = Request.isValid(si.type);
    if (validpacket) {
        firstProcessor.processRequest(si);
        if (si.cnxn != null) {
            incInProcess();
        }
    } else {
        LOG.warn("Received packet at server of unknown type " + si.type);
        new UnimplementedRequestProcessor().processRequest(si);
    }
}

进firstProcessor

//PrepRequestProcessor类片段
public class PrepRequestProcessor extends Thread implements RequestProcessor {
    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
    RequestProcessor nextProcessor;

    public void processRequest(Request request) {
       //这个方法只是把请求加入到阻塞队列中
       submittedRequests.add(request);
    }

    @Override
    public void run() {
        try {
            while (true) {
                //从队列中取出请求
                Request request = submittedRequests.take();
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }
                //处理请求,给写请求设置hdr,读请求的hdr为null
                pRequest(request);
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            LOG.error("Unexpected exception", e);
        } catch (Exception e) {
            LOG.error("Unexpected exception", e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }
    protected void pRequest(Request request) throws RequestProcessorException {
        //代码较长,只抓重要逻辑
        request.hdr = null;
        request.txn = null;
        ……
        request.zxid = zks.getZxid();
        //这个nextProcessor为SyncRequestProcessor,这个时候请求就到了SyncRequestProcessor的队列中了
        nextProcessor.processRequest(request);
    }
}

进入SyncRequestProcessor后:

//SyncRequestProcessor代码片段
public class SyncRequestProcessor extends Thread implements RequestProcessor {
    private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    private final RequestProcessor nextProcessor;
    
    public void processRequest(Request request) {
        // request.addRQRec(">sync");
        queuedRequests.add(request);
    }
    
    //猜个大致原则吧
    //优先处理queuedRequests中的请求
    //处理写请求,先写log日志,然后存入toFlush列表
    //当queuedRequests中没有数据时,处理toFlush列表
    //或者toFlush.size() > 1000时,处理toFlush列表
    @Override
    public void run() {
        try {
            int logCount = 0;

            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            setRandRoll(r.nextInt(snapCount/2));
            while (true) {
                Request si = null;
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
                    // zks.getZKDatabase().append(si) 对于写请求,改调用返回true,读请求返回false
                    // 写请求有hdr,读请求hdr为null
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount/2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new Thread("Snapshot Thread") {
                                        public void run() {
                                            try {
                                                zks.takeSnapshot();
                                            } catch(Exception e) {
                                                LOG.warn("Unexpected exception", e);
                                            }
                                        }
                                    };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        //toFlush列表为空时,读请求进入该分支
                        // optimization for read heavy workloads
                        // iff this is a read, and there are no pending
                        // flushes (writes), then just pass this to the next
                        // processor
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si);
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable)nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            LOG.error("Severe unrecoverable error, exiting", t);
            running = false;
            System.exit(11);
        }
        LOG.info("SyncRequestProcessor exited!");
    }
}