zookeeper(5) 客户端
zookeeper客户端主要负责与用户进行交互,将命令发送到服务器,接收服务器的响应,反馈给用户。主要分为一下三层:
用户命令处理层
用户命令处理层的功能是读取用户输入的命令,解析用户命令和输入参数,根据命令和参数,进行一些校验,然后执行节点操作。
源码实例(ZooKeeperMain):
1 public class ZooKeeperMain { 2 // 命令解析器。用于解析命令 3 protected MyCommandOptions cl = new MyCommandOptions(); 4 5 // 主函数 6 public static void main(String args[]) throws KeeperException, IOException, InterruptedException { 7 // 运行客户端 8 ZooKeeperMain main = new ZooKeeperMain(args); 9 main.run(); 10 } 11 12 public ZooKeeperMain(String args[]) throws IOException, InterruptedException { 13 // 解析启动参数 14 cl.parseOptions(args); 15 // 获取server参数,连接服务器 16 connectToZK(cl.getOption("server")); 17 18 } 19 20 // 连接服务器 21 protected void connectToZK(String newHost) throws InterruptedException, IOException { 22 host = newHost; 23 zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher()); 24 } 25 26 void run() throws KeeperException, IOException, InterruptedException { 27 // 循环读取命令, 28 BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 29 String line; 30 while ((line = br.readLine()) != null) { 31 // 执行命令 32 executeLine(line); 33 } 34 } 35 36 public void executeLine(String line) throws InterruptedException, IOException, KeeperException { 37 if (!line.equals("")) { 38 // 解析命令 39 cl.parseCommand(line); 40 // 执行命令 41 processZKCmd(cl); 42 } 43 } 44 45 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { 46 // 读取命令和参数 47 Stat stat = new Stat(); 48 String[] args = co.getArgArray(); 49 String cmd = co.getCommand(); 50 boolean watch = args.length > 2; 51 String path = null; 52 List<ACL> acl = Ids.OPEN_ACL_UNSAFE; 53 // 执行不同的命令,主要是进行一些校验,然后调用zookeeper方法 54 if (cmd.equals("quit")) { 55 zk.close(); 56 System.exit(0); 57 } else if (cmd.equals("redo") && args.length >= 2) { 58 Integer i = Integer.decode(args[1]); 59 if (commandCount <= i) { 60 return false; 61 } 62 cl.parseCommand(history.get(i)); 63 history.put(commandCount, history.get(i)); 64 processCmd(cl); 65 } else if (cmd.equals("history")) { 66 for (int i = commandCount - 10; i <= commandCount; ++i) { 67 if (i < 0) 68 continue; 69 System.out.println(i + " - " + history.get(i)); 70 } 71 } else if (cmd.equals("printwatches")) { 72 if (args.length == 1) { 73 System.out.println("printwatches is " + (printWatches ? "on" : "off")); 74 } else { 75 printWatches = args[1].equals("on"); 76 } 77 } else if (cmd.equals("connect")) { 78 if (args.length >= 2) { 79 connectToZK(args[1]); 80 } else { 81 connectToZK(host); 82 } 83 } 84 if (cmd.equals("create") && args.length >= 3) { 85 int first = 0; 86 CreateMode flags = CreateMode.PERSISTENT; 87 if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { 88 first += 2; 89 flags = CreateMode.EPHEMERAL_SEQUENTIAL; 90 } else if (args[1].equals("-e")) { 91 first++; 92 flags = CreateMode.EPHEMERAL; 93 } else if (args[1].equals("-s")) { 94 first++; 95 flags = CreateMode.PERSISTENT_SEQUENTIAL; 96 } 97 if (args.length == first + 4) { 98 acl = parseACLs(args[first + 3]); 99 } 100 path = args[first + 1]; 101 String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags); 102 } else if (cmd.equals("delete") && args.length >= 2) { 103 path = args[1]; 104 zk.delete(path, watch ? Integer.parseInt(args[2]) : -1); 105 } else if (cmd.equals("set") && args.length >= 3) { 106 path = args[1]; 107 stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); 108 printStat(stat); 109 } else if (cmd.equals("aget") && args.length >= 2) { 110 path = args[1]; 111 zk.getData(path, watch, dataCallback, path); 112 } else if (cmd.equals("get") && args.length >= 2) { 113 path = args[1]; 114 byte data[] = zk.getData(path, watch, stat); 115 data = (data == null) ? "null".getBytes() : data; 116 System.out.println(new String(data)); 117 printStat(stat); 118 } else if (cmd.equals("ls") && args.length >= 2) { 119 path = args[1]; 120 List<String> children = zk.getChildren(path, watch); 121 System.out.println(children); 122 } else if (cmd.equals("ls2") && args.length >= 2) { 123 path = args[1]; 124 List<String> children = zk.getChildren(path, watch, stat); 125 System.out.println(children); 126 printStat(stat); 127 } else if (cmd.equals("getAcl") && args.length >= 2) { 128 path = args[1]; 129 acl = zk.getACL(path, stat); 130 for (ACL a : acl) { 131 System.out.println(a.getId() + ": " + getPermString(a.getPerms())); 132 } 133 } else if (cmd.equals("setAcl") && args.length >= 3) { 134 path = args[1]; 135 stat = zk.setACL(path, parseACLs(args[2]), args.length > 4 ? Integer.parseInt(args[3]) : -1); 136 printStat(stat); 137 } else if (cmd.equals("stat") && args.length >= 2) { 138 path = args[1]; 139 stat = zk.exists(path, watch); 140 printStat(stat); 141 } else if (cmd.equals("listquota") && args.length >= 2) { 142 path = args[1]; 143 String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode; 144 byte[] data = null; 145 try { 146 data = zk.getData(absolutePath, false, stat); 147 StatsTrack st = new StatsTrack(new String(data)); 148 data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat); 149 System.out.println("Output stat for " + path + " " + new StatsTrack(new String(data)).toString()); 150 } catch (KeeperException.NoNodeException ne) { 151 System.err.println("quota for " + path + " does not exist."); 152 } 153 } else if (cmd.equals("setquota") && args.length >= 4) { 154 String option = args[1]; 155 String val = args[2]; 156 path = args[3]; 157 System.err.println("Comment: the parts are " + "option " + option + " val " + val + " path " + path); 158 if ("-b".equals(option)) { 159 // we are setting the bytes quota 160 createQuota(zk, path, Long.parseLong(val), -1); 161 } else if ("-n".equals(option)) { 162 // we are setting the num quota 163 createQuota(zk, path, -1L, Integer.parseInt(val)); 164 } else { 165 usage(); 166 } 167 168 } else if (cmd.equals("delquota") && args.length >= 2) { 169 // if neither option -n or -b is specified, we delete 170 // the quota node for thsi node. 171 if (args.length == 3) { 172 // this time we have an option 173 String option = args[1]; 174 path = args[2]; 175 if ("-b".equals(option)) { 176 delQuota(zk, path, true, false); 177 } else if ("-n".equals(option)) { 178 delQuota(zk, path, false, true); 179 } 180 } else if (args.length == 2) { 181 path = args[1]; 182 // we dont have an option specified. 183 // just delete whole quota node 184 delQuota(zk, path, true, true); 185 } else if (cmd.equals("help")) { 186 usage(); 187 } 188 } else if (cmd.equals("close")) { 189 zk.close(); 190 } else if (cmd.equals("addauth") && args.length >= 2) { 191 byte[] b = null; 192 if (args.length >= 3) 193 b = args[2].getBytes(); 194 195 zk.addAuthInfo(args[1], b); 196 } else { 197 usage(); 198 } 199 return watch; 200 } 201 }
除了基础的节点操作外,用户命令层还提供了节点配额的控制。节点配额的控制通过在/zookeeper/quaota对应的目录下记录当前节点数据大小和现在大小实现。
源码实例(ZooKeeperMain.createQuota):
1 public static boolean createQuota(ZooKeeper zk, String path, 2 long bytes, int numNodes) 3 throws KeeperException, IOException, InterruptedException 4 { 5 //判断指定路径是否存在 6 Stat initStat = zk.exists(path, false); 7 if (initStat == null) { 8 throw new IllegalArgumentException(path + " does not exist."); 9 } 10 String quotaPath = Quotas.quotaZookeeper; 11 String realPath = Quotas.quotaZookeeper + path; 12 try { 13 //判断在子节点中是否有限量设置 14 List<String> children = zk.getChildren(realPath, false); 15 for (String child: children) { 16 if (!child.startsWith("zookeeper_")) { 17 throw new IllegalArgumentException(path + " has child " + 18 child + " which has a quota"); 19 } 20 } 21 } catch(KeeperException.NoNodeException ne) { 22 // this is fine 23 } 24 //判断夫节点中是否有限量设置 25 checkIfParentQuota(zk, path); 26 //如果当前节点限量设置为空,逐级创建节点数据 27 if (zk.exists(quotaPath, false) == null) { 28 try { 29 zk.create(Quotas.procZookeeper, null, Ids.OPEN_ACL_UNSAFE, 30 CreateMode.PERSISTENT); 31 zk.create(Quotas.quotaZookeeper, null, Ids.OPEN_ACL_UNSAFE, 32 CreateMode.PERSISTENT); 33 } catch(KeeperException.NodeExistsException ne) { 34 // do nothing 35 } 36 } 37 String[] splits = path.split("/"); 38 StringBuilder sb = new StringBuilder(); 39 sb.append(quotaPath); 40 for (int i=1; i<splits.length; i++) { 41 sb.append("/" + splits[i]); 42 quotaPath = sb.toString(); 43 try { 44 zk.create(quotaPath, null, Ids.OPEN_ACL_UNSAFE , 45 CreateMode.PERSISTENT); 46 } catch(KeeperException.NodeExistsException ne) { 47 //do nothing 48 } 49 } 50 //创建限量设置节点 51 String statPath = quotaPath + "/" + Quotas.statNode; 52 quotaPath = quotaPath + "/" + Quotas.limitNode; 53 StatsTrack strack = new StatsTrack(null); 54 strack.setBytes(bytes); 55 strack.setCount(numNodes); 56 try { 57 zk.create(quotaPath, strack.toString().getBytes(), 58 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 59 StatsTrack stats = new StatsTrack(null); 60 stats.setBytes(0L); 61 stats.setCount(0); 62 zk.create(statPath, stats.toString().getBytes(), 63 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 64 } catch(KeeperException.NodeExistsException ne) { 65 byte[] data = zk.getData(quotaPath, false , new Stat()); 66 StatsTrack strackC = new StatsTrack(new String(data)); 67 if (bytes != -1L) { 68 strackC.setBytes(bytes); 69 } 70 if (numNodes != -1) { 71 strackC.setCount(numNodes); 72 } 73 zk.setData(quotaPath, strackC.toString().getBytes(), -1); 74 } 75 return true; 76 }