利用redis实现分布式锁(2) ——具体实现方式一
利用redis实现分布式锁(二) ——具体实现方式一
上一篇介绍了分布式锁的概念、作用、基本原理(http://guwq2014.iteye.com/blog/2365658),
这一篇看看如何使用redis实现一个分布式锁:
第一步:分布式锁实现类:
import redis.clients.jedis.ShardedJedis; import com.suning.framework.sedis.ShardedJedisAction; import com.suning.framework.sedis.impl.ShardedJedisClientImpl; /** * 基于redis实现的分布式锁 * * @author guweiqiang */ public class DistributedSedisLock { private ShardedJedisClientImpl jedisClient; // jedis client private String lockKey; // 锁的redis key private int expireMsecs = 60 * 1000; // 锁超时,防止线程在入锁以后,无限的执行等待 private int timeoutMsecs = 10 * 1000; // 锁等待,防止线程饥饿 private boolean locked = false;// 拿到锁的标示:true表示拿到了锁 private static final long DEFAULT_SLEEP_TIME = 100; // 线程睡眠时间100毫秒 /*********************构造方法 start************************************/ public DistributedSedisLock(ShardedJedisClientImpl jedisClient, String lockKey) { this.jedisClient = jedisClient; this.lockKey = lockKey; } public DistributedSedisLock(ShardedJedisClientImpl jedisClient, String lockKey, int timeoutMsecs) { this(jedisClient, lockKey); this.timeoutMsecs = timeoutMsecs; } public DistributedSedisLock(ShardedJedisClientImpl jedisClient, String lockKey, int timeoutMsecs, int expireMsecs) { this(jedisClient, lockKey, timeoutMsecs); this.expireMsecs = expireMsecs; } /*********************构造方法 end************************************/ public String getLockKey() { return lockKey; } /** * 判断是否拿到了锁(对外提供的获取锁的方法) * @return true:拿到了锁;false:没有拿到锁 * @throws InterruptedException */ public synchronized boolean acquire() throws InterruptedException { return acquire(jedisClient); } /** * 判断是否拿到了锁 * @param redisClient * @return true:拿到了锁;false:没有拿到锁 * @throws InterruptedException */ private synchronized boolean acquire(ShardedJedisClientImpl jedisClient) throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; final String expiresStr = String.valueOf(expires); // 锁到期时间 // 加锁 Long setnxResult = jedisClient.execute(new ShardedJedisAction<Long>() { public Long doAction(ShardedJedis jedis) { return jedis.setnx(lockKey, expiresStr); } }); if (setnxResult!=null && setnxResult.intValue()==1) { // setnx返回1,表示设置成功 // lock acquired success locked = true; return true; } // 获取redis里的时间 String currentValueStr = jedisClient.execute(new ShardedJedisAction<String>() { public String doAction(ShardedJedis jedis) { return jedis.get(lockKey); } }); if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { // 判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的 // lock is expired String oldValueStr = jedisClient.execute(new ShardedJedisAction<String>() { public String doAction(ShardedJedis jedis) { return jedis.getSet(lockKey, expiresStr); } }); // 获取上一个锁到期时间,并设置现在的锁到期时间, // 只有一个线程才能获取上一个线程的设置时间,因为jedis.getSet是同步的(原子的) if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { // 如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁 // lock acquired locked = true; return true; } } timeout -= DEFAULT_SLEEP_TIME; Thread.sleep(DEFAULT_SLEEP_TIME); } return false; } /** * 释放锁(对外提供的释放锁的方法) */ public synchronized void release() { release(jedisClient); } /** * 释放锁 */ private synchronized void release(ShardedJedisClientImpl jedisClient) { if (locked) { jedisClient.execute(new ShardedJedisAction<Long>() { public Long doAction(ShardedJedis jedis) { return jedis.del(lockKey); } }); locked = false; } } }
第二步:对外暴露的使用工具类:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.suning.framework.sedis.impl.ShardedJedisClientImpl; /** * 分布式锁使用工具类 * * @author guweiqiang */ public class DistributedLock { private static final Logger LOGGER = LoggerFactory.getLogger(DistributedLock.class.getName()); private DistributedSedisLock distributedSedisLock; // 分布式锁 private static ShardedJedisClientImpl jedisClient; // jedis client private String lockKey; // 锁的redis key private int expireMsecs; // 锁超时,防止线程在入锁以后,无限的执行等待 private int timeoutMsecs; // 锁等待,防止线程饥饿 public DistributedLock(String lockKey){ this(lockKey, 3000, 300000); } public DistributedLock(String lockKey, int timeoutMsecs, int expireMsecs){ this.lockKey = "YEB:BYUTIL:SHP:LOCK:" + lockKey; this.timeoutMsecs = timeoutMsecs; this.expireMsecs = expireMsecs; this.distributedSedisLock = new DistributedSedisLock(jedisClient, this.lockKey.intern(), timeoutMsecs, expireMsecs); } /** * 线程包装 */ public void wrap(Runnable runnable){ long begin = System.currentTimeMillis(); try { // timeout超时,等待入锁的时间,设置为3秒;expiration过期,锁存在的时间设置为5分钟 LOGGER.info("begin logck,lockKey={},timeoutMsecs={},expireMsecs={}", lockKey, timeoutMsecs, expireMsecs); if(distributedSedisLock.acquire()){ // 拿到了锁,执行线程任务 runnable.run(); } else { LOGGER.info("The time wait for lock more than [{}] ms ", timeoutMsecs); } } catch(Exception e){ LOGGER.error("acquire lock Exception ", e); } finally { LOGGER.info("[{}]cost={}", lockKey, System.currentTimeMillis() - begin); // 释放锁 if(distributedSedisLock!=null){ distributedSedisLock.release(); } } } /** * 初始化jedisClient * @param jedisClient */ public static synchronized void setShardedJedisClient(ShardedJedisClientImpl jedisClient) { DistributedLock.jedisClient = jedisClient; } }
在配置一个监听器,用来初始化jedis client(使用其他方式进行初始化也可以):
import javax.servlet.ServletContextEvent; import org.springframework.context.ApplicationContext; import org.springframework.web.context.ContextLoaderListener; import com.suning.framework.sedis.impl.ShardedJedisClientImpl; import com.suning.shp.utils.DistributedLock; /** * 启动监听器 * * @author guweiqiang */ public class SystemListener extends ContextLoaderListener { private ApplicationContext applicationContext; @Override public void contextInitialized(ServletContextEvent event) { super.contextInitialized(event); /************* spring *********/ applicationContext = super.getCurrentWebApplicationContext(); /**** redis 分布式锁 *****/ DistributedLock.setShardedJedisClient(applicationContext.getBean("jedisClient", ShardedJedisClientImpl.class)); } @Override public void contextDestroyed(ServletContextEvent event) { super.contextDestroyed(event); } }
监听器写好之后,需要在web.xml里配置一下:
<listener> <listener-class>com.suning.shp.listener.SystemListener</listener-class> </listener>
至此,一个基于redis实现的分布式锁就可以使用了,使用方法如下:
DistributedLock lock = new DistributedLock(key, 10000, 5000); try { lock.wrap(new Runnable(){ @Override public void run() { // 这里写需要加分布式锁的业务代码 } }); } catch (Exception e){ LOGGER.error("发生异常:" + e.getMessage(), e); }