zk如何实现watch

在客户端发送命令:stat /zhang watch

在zk server中产生如下图的调用栈:

zk如何实现watch

//在DataTree类中有
private final WatchManager dataWatches = new WatchManager();

//在WatchManager类中有
private final HashMap<String, HashSet<Watcher>> watchTable =
    new HashMap<String, HashSet<Watcher>>();

private final HashMap<Watcher, HashSet<String>> watch2Paths =
    new HashMap<Watcher, HashSet<String>>();

我们详细分析addWatch代码:

//path就是客户端命令中的"/zhang",而Watcher就是客户端的连接对象NIOServerCnxn
//可以理解为客户端就是一个Watcher
//public abstract class ServerCnxn implements Stats, Watcher
//public class NIOServerCnxn extends ServerCnxn
public synchronized void addWatch(String path, Watcher watcher) {
    //多个客户端关注一个path
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);

    //一个客户端关注多个path
    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

在创建、删除、设置节点数据时,会触发watch:

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    //生成事件
    WatchedEvent e = new WatchedEvent(type,    KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        //删除关注path的客户端
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            //删除该客户端关注的path
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        //每个客户端处理watch事件
        //NIOServerCnxn.process(WatchedEvent event)
        w.process(e);
    }
    return watchers;
}