Zookeeper应用场景
分类:
IT文章
•
2022-03-30 10:41:46

-
数据发布与订阅:发布订阅模型,就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现数据的集中管理和动态更新。
-
配置中心:在应用中,将全局的配置信息放到ZK上集中管理,在应用启动的时候主动获取一次配置。同时,在节点上注册一个watcher,保证每次数据更新时会通知订阅者更新配置信息。
-
元数据维护:在分布式搜索服务中,索引的元信息和服务器集群机器的节点状态保存在ZK的指定节点上,供各个客户端使用。
-
分布式日志收集系统:这个系统的核心工作是收集不同服务器的日志。收集器通常是按照应用来分配任务单元,因此在ZK上用应用名创建一个节点,将需要收集的服务器的IP注册到这个节点的子结点上。
-
命名服务:客户端应用能够根据指定名字来获取资源或服务的地址、提供者等信息。
-
分布式通知/协调:ZK中特有的watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是:不同系统对ZK上的同一个节点进行注册,监听节点的变化。任意一个系统对节点进行了更新,其它系统都能接到通知,并进行相应处理。
package zookeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
public class Server {
private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182";
private static final int TIME_OUT = 15000;
private ZooKeeper zooKeeper;
public Server() throws IOException {
this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> {
});
}
/**
* 创建临时序列节点(服务器关闭时节点会自动删除)
* @param serverName
* @return
* @throws Exception
*/
public String regsterServer(String serverName) throws Exception {
String result = zooKeeper.create("/clusterServer/server", serverName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(serverName+" server start....");
return result;
}
public static void main(String[] args) throws Exception {
Server server = new Server();
server.regsterServer(args[0]);
//保持程序运行,防止程序停止运行导致节点自动删除
while (true) {
}
}
}
View Code
package zookeeper;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 获取并监听注册的服务器列表
*/
public class Client {
private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182";
private static final int TIME_OUT = 15000;
private ZooKeeper zooKeeper;
public Client() throws IOException {
this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> {
System.out.println("watcher works.");
try {
getServers();
} catch (Exception e) {
e.printStackTrace();
}
});
}
public void getServers() throws Exception {
List<String> servers = new ArrayList<>();
List<String> children = zooKeeper.getChildren("/clusterServer", true, null);
for (String child : children) {
String server = getData("/clusterServer/"+child);
servers.add(server);
}
System.out.println(servers);
}
public String getData(String path) throws Exception {
byte[] data = zooKeeper.getData(path, true, null);
return new String(data);
}
public static void main(String[] args) throws Exception {
Client client = new Client();
client.getServers();
while (true) {
}
}
}
View Code
具体的实现方式是:
/**
* 通过创建临时节点,实现服务器之间的独占锁
*/
@Test
public void singleLock() {
try {
//参数:1,节点路径; 2,要存储的数据; 3,节点的权限; 4,节点的类型
String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//创建成功,则相当于拥有独占锁,可以进行以下逻辑
//TODO 业务逻辑
System.out.println(nodePath);
//业务逻辑结束后,删除节点,即释放锁资源
zooKeeper.delete("/lock", -1);
} catch (Exception e) {
//创建节点失败,重新调用,直至创建成功
if (e instanceof KeeperException && "NODEEXISTS".equals(((KeeperException)e).code().name())) {
System.out.println("Node exists.");
singleLock();
}else {
e.printStackTrace();
}
}
}
View Code
/**
* 通过创建临时节点,实现服务器之间的独占锁
*/
@Test
public void singleLock2() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists("/lock", false);
//如果节点已经存在,等待其它服务器删除节点。即:等待其它服务器释放锁资源
while(stat != null) { }
//参数:1,节点路径; 2,要存储的数据; 3,节点的权限; 4,节点的类型
String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//创建成功,则相当于拥有独占锁,可以进行以下逻辑
//TODO 业务逻辑
System.out.println(nodePath);
//业务逻辑结束后,删除节点,即释放锁资源
zooKeeper.delete("/lock", -1);
}
View Code
/**
* 通过创建临时时序节点,实现服务器之间的时序锁
*/
@Test
public void lock() throws KeeperException, InterruptedException {
//创建临时时序节点
String nodePath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
while(true) {
List<String> children = zooKeeper.getChildren("/lock", false);
//排序,并获取序号最小的节点。(序号越小,表明请求时间越早,优先获取锁资源)
children.sort(String::compareTo);
if (nodePath.equals("/lock/"+children.get(0))){
//TODO 业务逻辑
System.out.println("TODO Logic.");
break;
}
}
//业务逻辑结束后,删除节点,即释放锁资源
zooKeeper.delete(nodePath, -1);
}
View Code
package zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
*
* 通过Zookeeper实现服务器之间的时序锁
*
*/
public class SeqLock {
private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182";
private static ZooKeeper zooKeeper;
private String thispath;
public SeqLock() throws IOException {
//超时时间单位:毫秒
zooKeeper = new ZooKeeper(CONNECT_STRING, 15000, event -> {
//监听“/lock”的子节点变化。如果有服务器释放锁,判断自己是否获取锁
if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
&& event.getPath().startsWith("/lock")) {
try {
List<String> children = zooKeeper.getChildren("/lock", false);
if (children.size() > 0) {
Collections.sort(children);
String fistNode = "/lock/"+children.get(0);
if (fistNode.equals(thispath)){
doSomethingAndDelNode();
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 通过创建临时时序节点注册时序锁,并监听服务器列表
*/
public void lock() throws KeeperException, InterruptedException {
thispath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(thispath);
List<String> children = zooKeeper.getChildren("/lock", false);
//如果只有一个子节点,说明锁资源被当前服务器持有。如果子节点不止一个,说明锁资源已经被其它服务器持有
if(children.size() == 1) {
doSomethingAndDelNode();
}
}
private void doSomethingAndDelNode() throws InterruptedException, KeeperException {
//TODO 业务逻辑
System.out.println("TODO Logic.");
//业务逻辑结束后,删除节点,即释放锁资源
zooKeeper.delete(thispath, -1);
}
public static void main(String[] args) {
try {
SeqLock lock = new SeqLock();
lock.lock();
} catch (Exception e) {
e.printStackTrace();
}
}
}