zookeeper记录3(使用ZooKeeper原生Java API进行客户端开发)
1、建立客户端与zk服务端的连接
2、zk会话重连
3、同步或异步创建节点
4、修改节点数据
5、同步或异步删除节点数据
6、节点查询
6.1、获取节点数据
6.2、获取子节点列表
6.3、判断节点是否存在
7、acl权限列表
7.1、自定义用户权限
7.2、acl之ip权限
1、建立客户端与zk服务端的连接 <--返回目录
将下载的zookeeper-3.4.11.tar.gz解压,在里面找到需要的jar。
ZkConnect
package com.oy;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Auther: oy
* @Date: 2021/3/20 20:18
* @Version 1.0
*/
public class ZkConnect implements Watcher {
private static final Logger log = LoggerFactory.getLogger(ZkConnect.class);
//public static final String zkServerPath = "192.168.213.200:2181";
public static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public static final Integer timeout = 5000;
/**
* 客户端与zkServer连接是一个异步的过程,当连接成功后,客户端会收到一个watch通知
* 参数:
* connectString: 连接服务器的ip字符串
* sessionTimeout: 超时时间,心跳收不到了,就超时
* watcher: 通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,就设置为null
* canBeReadOnly: 可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写;此时数据被读取到的可能
* 是旧数据,此处建议设置为false
* sessionId: 会话id
* sessionPasswd: 会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话
*/
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkConnect());
log.warn("客户端开始连接zookeeper服务器。。。连接状态: {}", zk.getState());
new Thread().sleep(2000); // 如果不停顿一段时间, 会收不到watch通知
log.warn("连接状态: {}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
}
}
log4j.properties
log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.encoding=UTF-8
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n
2、zk会话重连 <--返回目录
package com.oy;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkConnect2 implements Watcher {
private static final Logger log = LoggerFactory.getLogger(ZkConnect2.class);
//public static final String zkServerPath = "192.168.213.200:2181";
public static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public static final Integer timeout = 5000;
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZkConnect2());
long sessionId = zk.getSessionId();
byte[] sessionPasswd = zk.getSessionPasswd();
log.warn("客户端开始连接zookeeper服务器。。。连接状态: {}", zk.getState());
new Thread().sleep(2000); // 如果不停顿一段时间, 会收不到watch通知
log.warn("连接状态: {}", zk.getState());
new Thread().sleep(1000);
log.warn("开始会话重连...");
ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZkConnect2(), sessionId, sessionPasswd);
log.warn("重新连接, 状态: {}", zk.getState());
new Thread().sleep(1000);
log.warn("重新连接, 状态: {}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
}
}
3、同步或异步创建节点 <--返回目录
ZkNodeOperator
package com.oy;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
public class ZkNodeOperator implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZkNodeOperator.class);
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
private static final Integer timeout = 5000;
public ZkNodeOperator() {}
public ZkNodeOperator(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZkNodeOperator());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ZkNodeOperator zkNodeOperator = new ZkNodeOperator(zkServerPath);
zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
/**
* 同步或异步创建节点,都不支持字节点的递归创建,异步有一个callback函数
* 参数:
* path: 创建的路径
* data: 存储的数据
* acl: 控制权限策略. Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* Ids.CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode: 节点类型,是一个枚举
* PERSISTENT 持久节点
* PERSISTENT_SEQUENTIAL 持久顺序节点
* EPHEMERAL 临时节点
* EPHEMERAL_SEQUENTIAL 临时顺序节点
*
* @param path
* @param data
* @param acls
*/
private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
String result = "";
try {
// 同步创建
//result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
//log.warn("同步创建临时节点: {} 成功。。。", result);
// 异步创建
String ctx = "{'create':'success'}";
zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL, new CreateNodeCallBack(), ctx);
new Thread().sleep(2000);
log.warn("异步创建临时节点: {} 成功。。。", result);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
log.warn("客户端连接接收到watch通知: {}", event);
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
CreateNodeCallBack
package com.oy;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 异步创建节点的回调函数
* @Auther: oy
* @Date: 2021/3/20 21:28
* @Version 1.0
*/
public class CreateNodeCallBack implements AsyncCallback.StringCallback {
private static final Logger log = LoggerFactory.getLogger(CreateNodeCallBack.class);
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.warn("异步创建节点:{}, ctx: {}", path, (String)ctx);
}
}
4、修改节点数据 <--返回目录
public static void main(String[] args) throws KeeperException, InterruptedException {
ZkNodeOperator zkNodeOperator = new ZkNodeOperator(zkServerPath);
// 创建节点
//zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
// 修改节点数据 第三个参数是版本号dataVersion,用于乐观锁控制
Stat stat = zkNodeOperator.getZooKeeper().setData("/testnode", "new data".getBytes(), 0);
log.warn("修改后, dataVersion版本: {}", stat.getVersion());
}
5、同步或异步删除节点数据 <--返回目录
public static void main(String[] args) throws KeeperException, InterruptedException {
ZkNodeOperator zkNodeOperator = new ZkNodeOperator(zkServerPath);
// 创建节点
//zkNodeOperator.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
// 修改节点数据
//Stat stat = zkNodeOperator.getZooKeeper().setData("/testnode", "new data".getBytes(), 0);
//log.warn("修改后, 版本: {}", stat.getVersion());
// 同步删除节点
//zkNodeOperator.getZooKeeper().delete("/testnode", 1); // 第二个参数 dataVersion
// 异步删除节点
String ctx = "{'delete':'success'}";
zkNodeOperator.getZooKeeper().delete("/testnode", 0, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
log.warn("异步删除节点:{}, ctx: {}", path, (String)ctx);
}
}, ctx);
}
6、节点查询 <--返回目录
6.1、获取节点数据 <--返回目录
代码
package com.oy.query;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: oy
* @Date: 2021/3/20 22:20
* @Version 1.0
*/
public class ZKGetNodeData implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKGetNodeData.class);
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
private static final Integer timeout = 5000;
public ZKGetNodeData() {}
public ZKGetNodeData(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception {
ZKGetNodeData zkGetNodeData = new ZKGetNodeData(zkServerPath);
// 第一个参数: 节点path; 第二个参数: true注册一个监听事件; 第三个参数: 获取的结果会保存在stat
byte[] result = zkGetNodeData.getZooKeeper().getData("/testnode", true, stat);
log.warn("当前值: {}", new String(result));
countDownLatch.await();
}
@Override
public void process(WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
ZKGetNodeData zkGetNodeData = new ZKGetNodeData(zkServerPath);
byte[] result = zkGetNodeData.getZooKeeper().getData("/testnode", false, stat);
log.warn("监听到值已经更改, 更改后的值为: {}, 版本号: {}", new String(result), stat.getVersion());
countDownLatch.countDown(); // 计数器减1
} else if (event.getType() == Event.EventType.NodeCreated) {
} else if (event.getType() == Event.EventType.NodeDeleted) {
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
}
} catch (Exception e) {
e.printStackTrace();
}
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
测试
1)首先创建节点 create /testnode aaa
2) 运行上面代码main方法,给节点 /testnode 注册一个监听, 然后计数器countDownLatch等待
3)然后改变 /testnode 的值:set /testnode bbb,watcher的process方法被执行, 判断是NodeDataChanged事件,计数器减1
6.2、获取子节点列表 <--返回目录
代码
package com.oy.query;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: oy
* @Date: 2021/3/20 22:50
* @Version 1.0
*/
public class ZKGetChildrenList implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKGetChildrenList.class);
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
private static final Integer timeout = 5000;
public ZKGetChildrenList() {}
public ZKGetChildrenList(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception {
ZKGetChildrenList zkGetChildrenList = new ZKGetChildrenList(zkServerPath);
// 同步调用: 参数1 节点路径, 参数2 true或false, 注册一个watch事件
List<String> children = zkGetChildrenList.getZooKeeper().getChildren("/testnode", true);
for (String child : children) {
log.warn(child);
}
// 异步调用
// String ctx = "{'callback':'ChildrenCallback'}";
// zkGetChildrenList.getZooKeeper().getChildren("/testnode", true, new AsyncCallback.ChildrenCallback() {
// @Override
// public void processResult(int rc, String path, Object ctx, List<String> children) {
// log.warn("callback, path: {}, children: {}", path, children.toString());
// }
// }, ctx);
countDownLatch.await();
}
@Override
public void process(WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
} else if (event.getType() == Event.EventType.NodeCreated) {
} else if (event.getType() == Event.EventType.NodeDeleted) {
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
ZKGetChildrenList zkGetChildrenList = new ZKGetChildrenList(zkServerPath);
List<String> children = zkGetChildrenList.getZooKeeper().getChildren("/testnode", false);
log.warn("监听到子节点改变, 改变后子节点数组为:");
for (String child : children) {
log.warn(child);
}
countDownLatch.countDown(); // 计数器减1
}
} catch (Exception e) {
e.printStackTrace();
}
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
测试:
1) 首先运行上面代码的main方法,给/testnode注册一个监听
2) 给/testnode新建一个子节点 creat /testnode/son2 bbb
3) 测试异步调用
6.3、判断节点是否存在 <--返回目录
package com.oy.query;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: oy
* @Date: 2021/3/20 23:21
* @Version 1.0
*/
public class ZKNodeExist implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKNodeExist.class);
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
private static final Integer timeout = 5000;
public ZKNodeExist() {}
public ZKNodeExist(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZKNodeExist());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZKNodeExist zkNodeExist = new ZKNodeExist(zkServerPath);
Stat stat = zkNodeExist.getZooKeeper().exists("/testnode", true);
if (stat == null) {
log.warn("节点/testnode不存在");
} else {
log.warn("节点/testnode存在. stat: {}", stat);
}
countDownLatch.await();
}
@Override
public void process(WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
} else if (event.getType() == Event.EventType.NodeCreated) {
} else if (event.getType() == Event.EventType.NodeDeleted) {
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
}
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
7、acl权限列表 <--返回目录
前面我们的代码创建节点使用了Ids.OPEN_ACL_UNSAFE 相当于 world:anyone:cdrwa
7.1、自定义用户权限 <--返回目录
package com.oy.acl;
import com.oy.CreateNodeCallBack;
import com.oy.ZkNodeOperator;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.acl.Acl;
import java.util.ArrayList;
import java.util.List;
/**
* @Auther: oy
* @Date: 2021/3/21 00:33
* @Version 1.0
*/
public class ZKNodeAcl implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKNodeAcl.class);
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
private static final Integer timeout = 5000;
public ZKNodeAcl() {}
public ZKNodeAcl(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZKNodeAcl());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
ZKNodeAcl zkNodeAcl = new ZKNodeAcl(zkServerPath);
// 创建节点
//zkNodeAcl.createZKNode("/testnode", "testnode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
// 自定义用户认证访问
ArrayList<ACL> acls = new ArrayList<>();
Id userPwd1 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan1:123"));
Id userPwd2 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan2:123"));
acls.add(new ACL(ZooDefs.Perms.ALL, userPwd1));
acls.add(new ACL(ZooDefs.Perms.READ, userPwd2));
acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.DELETE, userPwd2));
// 创建节点
//zkNodeAcl.createZKNode("/testacl", "testacl".getBytes(), acls);
// 获取节点数据 org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /testacl
//Stat stat = new Stat();
//byte[] result = zkNodeAcl.getZooKeeper().getData("/testacl", false, stat);
// 获取节点数据 通过addAuthInfo操作节点
zkNodeAcl.getZooKeeper().addAuthInfo("digest", "zhangsan1:123".getBytes());
Stat stat = new Stat();
byte[] result = zkNodeAcl.getZooKeeper().getData("/testacl", false, stat);
log.warn("当前值: {}, 版本: {}", new String(result), stat.getVersion());
}
/**
* 创建节点
* @param path
* @param data
* @param acls
*/
private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
String result = "";
try {
// 同步创建
result = zooKeeper.create(path, data, acls, CreateMode.PERSISTENT);
log.warn("同步创建临时节点: {} 成功。。。", result);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
测试
7.2、acl之ip权限 <--返回目录
package com.oy.acl;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.acl.Acl;
import java.util.ArrayList;
/**
* @Auther: oy
* @Date: 2021/3/21 00:33
* @Version 1.0
*/
public class ZKNodeAcl implements Watcher {
private ZooKeeper zooKeeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKNodeAcl.class);
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
private static final Integer timeout = 5000;
public ZKNodeAcl() {}
public ZKNodeAcl(String connectString) {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new ZKNodeAcl());
} catch (Exception e) {
e.printStackTrace();
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
ZKNodeAcl zkNodeAcl = new ZKNodeAcl(zkServerPath);
// ip 方式的 acl
ArrayList<ACL> aclsIP = new ArrayList<>();
Id ipId1 = new Id("ip", "192.168.213.1");
aclsIP.add(new ACL(ZooDefs.Perms.ALL, ipId1));
// 创建节点
//zkNodeAcl.createZKNode("/testaclip", "testaclip".getBytes(), aclsIP);
// 验证ip是否有权限
Stat stat = new Stat();
byte[] result = zkNodeAcl.getZooKeeper().getData("/testaclip", false, stat);
log.warn("当前值: {}, 版本: {}", new String(result), stat.getVersion());
}
/**
* 创建节点
* @param path
* @param data
* @param acls
*/
private void createZKNode(String path, byte[] data, ArrayList<ACL> acls) {
String result = "";
try {
// 同步创建
result = zooKeeper.create(path, data, acls, CreateMode.PERSISTENT);
log.warn("同步创建临时节点: {} 成功。。。", result);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
log.warn("接收到watch通知: {}", event);
}
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
}
--