zookeeper分布式锁

一、使用zookeeper原生api实现分布式锁

1、思路一:利用Zookeeper不能重复创建一个节点的特性来实现一个分布式锁

流程:

  1. 需要获得锁时创建锁node节点。
  2. 如果创建失败,则表示该锁已经被别人占用,watch该节点状态,等待锁。
  3. 如果创建成功,则表示获得锁。
  4. 主动释放锁时删除对应的node节点即可。
  5. 获得锁的session超时或断开,由于锁node为临时节点则该节点也会删除。
  6. 节点删除时watch该节点的线程重新争抢锁。

代码:

public class ZookeeperLock1 implements Lock {

    private final String zookeeperLockRootNode = "zookeeperLock";

    private final ZooKeeper zooKeeper;
    private final String lockName;

    private class LockWatcher implements Watcher {

        // 该watch对应的需要获得锁的线程
        private final Thread thread;

        private LockWatcher(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void process(WatchedEvent event) {
            if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                synchronized (thread) {
                    thread.notifyAll();
                }
            }
        }
    }

    public ZookeeperLock1(ZooKeeper zooKeeper, String lockName) throws KeeperException, InterruptedException {
        this.lockName = lockName;
        this.zooKeeper = zooKeeper;
        // 检查所有锁的根节点,如果没有则创建。
        try {
            zooKeeper.create("/" + zookeeperLockRootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                throw e;
            }
        }
    }

    @Override
    public void getLock() throws KeeperException, InterruptedException {
        String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
        while (true) {
            // 直接创建锁节点,创建成功则表示拿到锁则return。返回节点存在异常则表示获取锁失败则等待锁。
            try {
                zooKeeper.create(zookeeperLockNodeName, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                break;
            } catch (KeeperException e) {
                if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                    throw e;
                }
            }
            // 添加节点监控
            zooKeeper.exists(zookeeperLockNodeName, new LockWatcher(Thread.currentThread()));
            synchronized (Thread.currentThread()) {
                // 线程等待锁,只有在删除节点的watch中才会重新激活线程
                Thread.currentThread().wait();
            }
        }
    }

    @Override
    public void freeLock() throws KeeperException, InterruptedException {
        String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
        zooKeeper.delete(zookeeperLockNodeName, -1);
    }
}

测试:

public class ZookeeperLock1Test {

    @Test
    public void testLock() throws IOException, InterruptedException {
        final String zookeeperHost = "10.5.31.155";
        final String zookeeperPort = "2181";
        final String lockName = "testLock1";
        final int threadCnt = 9;
        final CountDownLatch countDownLatchConnect = new CountDownLatch(1);
        final ZooKeeper zooKeeper = new ZooKeeper(zookeeperHost + ":" + zookeeperPort, 60000, event -> {
            if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                countDownLatchConnect.countDown();
            }
        });
        countDownLatchConnect.await();

        final CountDownLatch countDownLatchThread = new CountDownLatch(threadCnt);
        for (int i = 1; i <= threadCnt; i++) {
            Runnable runnable = () -> {
                try {
                    ZookeeperLock1 zookeeperLock1 = new ZookeeperLock1(zooKeeper, lockName);
                    zookeeperLock1.getLock();
                    System.out.println(Thread.currentThread().getName() + ":获得锁");
                    Thread.sleep(2000);
                    zookeeperLock1.freeLock();
                    System.out.println(Thread.currentThread().getName() + ":释放锁");
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatchThread.countDown();
                }
            };
            Thread thread = new Thread(runnable, "线程" + i);
            thread.start();
        }
        countDownLatchThread.await();
    }

}

优点:

  1. 实现比较简单,可拿来即用
  2. 有通知机制,能提供较快的响应
  3. 通过临时节点机制,保证节点能及时删掉

缺点:

有惊群效应。一个节点删除的时候,大量对这个节点的删除动作有订阅Watcher的线程会进行回调。

2、思路二:利用Zookeeper顺序节点特性来实现一个分布式锁

由于思路一实现的分布式锁有惊群效应,所以我们可以利用zookeeper顺序节点特性避免比效果。流程如下:

  1. 需要获得锁时创建顺序临时节点。
  2. 查看该节点是否为最小节点,如果是则表示获得锁,如果否则表示锁已经被别人占用,watch该节点上一个顺序节点,等待锁。
  3. 主动释放锁时删除最小节点即可。
  4. 获得锁的session超时或断开,由于锁node为临时节点则该节点也会删除。
  5. 节点删除时watch该节点的下一个节点会重新判断自己是否为最小节点,执行第2步。

代码:

public class ZookeeperLock2 implements Lock{

    private final String zookeeperLockRootNode = "zookeeperLock";
    private final String zookeeperLockPrefix = "lock_";

    private final ZooKeeper zooKeeper;
    private final String lockName;


    private class LockWatcher implements Watcher {

        // 该watch对应的需要获得锁的线程
        private final Thread thread;

        private LockWatcher(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void process(WatchedEvent event) {
            if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                synchronized (thread) {
                    thread.notifyAll();
                }
            }
        }
    }

    public ZookeeperLock2(ZooKeeper zooKeeper, String lockName) throws InterruptedException, KeeperException {
        this.zooKeeper = zooKeeper;
        this.lockName = lockName;
        // 检查所有锁的根节点,如果没有则创建。
        try {
            zooKeeper.create("/" + zookeeperLockRootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                throw e;
            }
        }
    }


    @Override
    public void getLock() throws InterruptedException, KeeperException {
        String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
        try {
            zooKeeper.create(zookeeperLockNodeName, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            if (!e.code().equals(KeeperException.Code.NODEEXISTS)) {
                throw e;
            }
        }
        // 创建临时顺序节点
        String zookeeperLockSubNodeName = zooKeeper.create(zookeeperLockNodeName + "/" + zookeeperLockPrefix, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        while (true) {
            // 获取该锁下所有子节点并排序
            List<String> zookeeperLockNodeChildren = zooKeeper.getChildren(zookeeperLockNodeName, false);
            Collections.sort(zookeeperLockNodeChildren);
            // 判断刚刚创建的节点是否为所有子节点中最小的那个,如果是则表示获得锁,如果否则表示等待锁
            if (zookeeperLockSubNodeName.equals(zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(0))) {
                break;
            } else {
                // 获取刚刚创建节点的上一个顺序节点
                String zookeeperLockPriorNodeName = zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(0);
                for (int i = 0; i < zookeeperLockNodeChildren.size(); i++) {
                    if (zookeeperLockSubNodeName.equals(zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(i))) {
                        break;
                    } else {
                        zookeeperLockPriorNodeName = zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(i);
                    }
                }
                // 监视刚刚创建节点的上一个顺序节点
                zooKeeper.exists(zookeeperLockPriorNodeName, new LockWatcher(Thread.currentThread()));
                synchronized (Thread.currentThread()) {
                    // 线程等待锁,只有在删除节点的watch中才会重新激活线程
                    Thread.currentThread().wait();
                }
            }
        }
    }

    @Override
    public void freeLock() throws KeeperException, InterruptedException {
        String zookeeperLockNodeName = "/" + zookeeperLockRootNode + "/" + lockName;
        // 获取该锁下所有子节点并排序
        List<String> zookeeperLockNodeChildren = zooKeeper.getChildren(zookeeperLockNodeName, false);
        Collections.sort(zookeeperLockNodeChildren);
        // 删除节点
        zooKeeper.delete(zookeeperLockNodeName + "/" + zookeeperLockNodeChildren.get(0), -1);
    }
}

测试:

public class ZookeeperLock2Test {

    @Test
    public void testLock() throws IOException, InterruptedException {
        final String zookeeperHost = "10.5.31.155";
        final String zookeeperPort = "2181";
        final String lockName = "testLock2";
        final int threadCnt = 9;
        final CountDownLatch countDownLatchConnect = new CountDownLatch(1);
        final ZooKeeper zooKeeper = new ZooKeeper(zookeeperHost + ":" + zookeeperPort, 60000, event -> {
            if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                countDownLatchConnect.countDown();
            }
        });
        countDownLatchConnect.await();

        final CountDownLatch countDownLatchThread = new CountDownLatch(threadCnt);
        for (int i = 1; i <= threadCnt; i++) {
            Runnable runnable = () -> {
                try {
                    ZookeeperLock2 zookeeperLock2 = new ZookeeperLock2(zooKeeper, lockName);
                    zookeeperLock2.getLock();
                    System.out.println(Thread.currentThread().getName() + ":获得锁");
                    Thread.sleep(2000);
                    zookeeperLock2.freeLock();
                    System.out.println(Thread.currentThread().getName() + ":释放锁");
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatchThread.countDown();
                }
            };
            Thread thread = new Thread(runnable, "线程" + i);
            thread.start();
        }
        countDownLatchThread.await();
    }

}

二、使用Curator实现分布式锁

其实上面写了这么多,也不过是重复造*罢了。Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。用Curator实现zookeeper的分布式锁非常简单。

public class ZookeeperLock3Test {

    @Test
    public void testLock() throws InterruptedException {
        final String zookeeperHost = "10.5.31.155";
        final String zookeeperPort = "2181";
        final int threadCnt = 9;

        final CuratorFramework zkClient = CuratorFrameworkFactory.newClient(zookeeperHost + ":" + zookeeperPort, new RetryNTimes(3, 5000));
        zkClient.start();

        CountDownLatch countDownLatch = new CountDownLatch(threadCnt);
        for (int i = 1; i <= threadCnt; i++) {
            Runnable runnable = () -> {
                InterProcessMutex zkLock = new InterProcessMutex(zkClient, "/zookeeperLock/testLock3");
                try {
                    zkLock.acquire();
                    System.out.println(Thread.currentThread().getName() + ":获得锁");
                    Thread.sleep(2000);
                    zkLock.release();
                    System.out.println(Thread.currentThread().getName() + ":释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            };
            Thread thread = new Thread(runnable, "线程" + i);
            thread.start();
        }
        countDownLatch.await();

        zkClient.close();
    }
}