springboot整合zookeeper实现分布式锁 01 安装并允许zookeeper 02 springboot应用配置CuratorFramework 03 使用zookeeper实现集群只一个应用实例执行定时任务 04 使用zookeeper实现分布式锁 05 使用zookeeper实现调度任务

springboot整合zookeeper实现分布式锁
01 安装并允许zookeeper
02 springboot应用配置CuratorFramework
03 使用zookeeper实现集群只一个应用实例执行定时任务
04 使用zookeeper实现分布式锁
05 使用zookeeper实现调度任务

  1. 安装jdk
  2. 官网下载zookeeper的压缩包,我这里下载的是3.4.10版本
  3. 解压后进入到zookeeper-3.4.10/conf,修改zoo_sample.cfg文件修改为zoo.cfg文件
mv zoo_sample.cfg zoo.cfg
  • 1
  1. 打开zoo.cfg文件,修改dataDir路径。修改后在/usr/local/zookeeper-3.4.10目录创建文件夹mkdir zkData
dataDir=/usr/local/zookeeper-3.4.10/zkData
  • 1
  1. 启动zookeeper
/usr/local/zookeeper-3.4.10/bin/zkServer.sh start
  • 1

02 springboot应用配置CuratorFramework

  1. 导入maven依赖
<!-- zookeeper 客户端 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
  1. 配置CuratorFramework

zookeeper的默认端口是2181

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@EnableJpaAuditing
@SpringBootApplication
public class MyDemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyDemoApplication.class, args);
	}

	@Bean
	public CuratorFramework curatorFramework() {
		return CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(5, 1000));
	}
}
  1. 启动CuratorFramework客户端
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;

/**
 * 实现了ApplicationRunner接口后,当容器启动后,会执行实现的run方法
 *
 * @author 594781919
 */
@Service
public class StartService implements ApplicationRunner {

    @Autowired
    private CuratorFramework curatorFramework;

    @Autowired
    private ListenerService listenerService;

    @Override
    public void run(ApplicationArguments applicationArguments) {
		// 非常重要!!!Start the client. Most mutator methods will not work until the client is started
        curatorFramework.start();
        System.out.println("zookeeper client start");
        // 初始化监听方法
        listenerService.listener();
    }
}

03 使用zookeeper实现集群只一个应用实例执行定时任务

当我们启动多个实例时,需要其中一个实例执行定时任务,其它实例不执行。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Date;

/**
 * 实现多个应用实例只一个执行定时任务
 *
 * @author 594781919@qq.com
 */
@Service
public class TimerTaskService {

    @Autowired
    private CuratorFramework curatorFramework;

    @Value("${server.port}")
    private String port;

    @Scheduled(cron = "0/5 * * * * *")
    public void task() {
        LeaderLatch leaderLatch = new LeaderLatch(curatorFramework, "/timerTask");
        try {
            leaderLatch.start();
            // Leader选举需要一些时间,等待2秒
            Thread.sleep(2000);
            // 判断是否为主节点
            if (leaderLatch.hasLeadership()) {
                System.out.println(new Date() + "    port=" + port + " 是领导");
                // 定时任务的业务逻辑代码
            } else {
                System.out.println(new Date() + "    port=" + port + " 是从属");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                leaderLatch.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

04 使用zookeeper实现分布式锁

import com.igola.domain.Employee;
import com.igola.repository.EmployeeRepository;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 594781919@qq.com
 */
@RestController
public class EmployeeController {
    @Autowired
    private EmployeeRepository employeeRepository;

    @Autowired
    private CuratorFramework curatorFramework;

    @GetMapping("/emp/save")
    public Employee save(String name) {

		// 获取锁
        InterProcessSemaphoreMutex balanceLock = new InterProcessSemaphoreMutex(curatorFramework, "/zktest" + name);
        Employee employee = new Employee();
        try {
        	// 执行加锁操作
            balanceLock.acquire();
            System.out.println("已经加锁了, name=" + name);
            employee.setName(name);
            if ("abc".equals(name)) {
                Thread.sleep(30000);
            }
            employee.setAge((int) (Math.random() * 100));
            employee.setSex(false);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
            	// 释放锁资源
                balanceLock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        employeeRepository.save(employee);

        return employee;
    }
}
  • 1
  • 2

05 使用zookeeper实现调度任务

当我们在启动多个服务后,访问了其中一个服务,执行了一些方法。然后我们需要其它服务也要执行这些方法,就需要用到NodeCache。

比如我们把一些数据缓存到Map对象中,当需要更新这个Map对象的数据时,我们就可以用NodeCache将每个服务都更新自己的Map对象。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import javax.annotation.PreDestroy;
import java.util.Date;

/**
 * @author 594781919
 */
@Service
public class ListenerService {
    private final CuratorFramework curatorFramework;
    private NodeCache nodeCache;

    public static final String path = "/hello/world";

    public ListenerService(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;

    }

    public void listener() {
        try {
            // 创建路径
            Stat stat = curatorFramework.checkExists().forPath(path);
            if (stat == null) {
                curatorFramework.create().creatingParentsIfNeeded().forPath(path);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        nodeCache = new NodeCache(curatorFramework, path);
        // 添加监听的路径改变后需要执行的任务
        nodeCache.getListenable().addListener(this::run);
        try {
            nodeCache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("开始监听......");
    }

    @PreDestroy
    public void preDestroy() {
        CloseableUtils.closeQuietly(nodeCache);
    }

    public void notifyNodeCache() {
        try {
            curatorFramework.setData().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

	// 需要执行的调度任务
    private void run() {
        System.out.println(new Date().toLocaleString() + ", 开始执行监听任务");
    }
}