在很多情况下,你的数据库不支持事务,分布式部署也使得你无法去使用JVM锁,那么这种时候,你可以考虑用分布式锁
文章目录
- 分布式锁
- 1. 实现方式
- 2. 特征
- 3. 操作
- 4. 代码改造
- 5. 测试
 
- 优化
- 1. 递归改成循环
- 2. 防止死锁
- 3. 防误删
- 4. LUA脚本 保证原子性
- 1) EVAL基本使用
- 2)代码实现
- 3) 测试
 
- 5 分布式可重入锁: lua脚本 + hash
- redis的hash数据结构
- 1) hset
- 2) hexists
- 3) hincrby
 
- 加锁优化为可重入
- 1)写成LUA脚本
- 2) 测试
 
- 解锁优化为可重入
- 1) 编写LUA脚本
- 2)测试
 
- 代码改造
- 1) 代码较复杂,将lock类单独拿出来定义使用
- 2) 工厂模式 DistributedLockClent
- 3)DistributedRedisLock
- 4) StockService
 
- 测试
- uuid优化
- 1) DistributedLockClient
- 2) DistributedRedisLock
 
 
- 6. 自动续期
- 1) Timer定时器
- 2) LUA脚本
- 3) DistributedRedisLock
- 4) 测试
 
 
- 总结
- 加锁
- 解锁
- 重试 循环/迭代
 
分布式锁
1. 实现方式
- 基于redis实现
- 基于zookeeper/etcd实现
- 基于mysql实现
2. 特征
(1) 独占排他使用 setnx
(2) 防死锁发生:
 redis客户端获取到锁后,立马宕机(设置失效时间)
 不可重入:可重入
(3) 原子性:
- 获取锁和设置过期 set key value ex 3 nx
- 判断和释放锁之间:使用 lua脚本
(4) 防误删:解铃还须系铃人,先判断再删除
(5) 可重入性:hash(key field value) + LUA脚本
(6) 自动续期:实际执行时间大于设置的失效时间
 定时任务(时间驱动Timer定时器)+ lua脚本
3. 操作
- 加锁
- 解锁
- 重试:递归、循环
简单来说,其实就类似于操作系统中的临界区-临界资源
一群线程去争抢该资源
- 经过一道阀口,只有一个线程可以抢到钥匙
- 持有钥匙的线程去执行,其他线程循环等待
- 该线程执行完成,归还钥匙,下一个抢到钥匙的线程执行
循环如此
那么代码上具体是如何操作呢
4. 代码改造
@Service
public class StockService {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void deduct() {
        // 用排他锁
        Boolean flag = redisTemplate.opsForValue().setIfAbsent("lock", "111");
        if(Boolean.FALSE.equals(flag)){
            // 如果没抢到,等50ms后,接着抢
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            deduct();
            return;
        }
        // 如果抢到了,直接执行
        try{
            // 1。 查询库存
            String stockStr = redisTemplate.opsForValue().get("stock");
            if (!StringUtil.isNullOrEmpty(stockStr)) {
                int stock = Integer.parseInt(stockStr);
                // 2。 判断条件是否满足
                if (stock > 0) {
                    // 3 更新redis
                    redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
                }
            }
        }finally {
            redisTemplate.delete("lock");
        }
    }
其实就是用一个新的redis key作为这把钥匙,
- 某个线程成功setIfAbsent,那它就拿到了这把钥匙,其他线程只能等待
- 该线程执行完成后,delete该key
- 另一个成功setIfAbsent的线程拿到钥匙,开始执行
5. 测试

 这里我直接使用了本地redis,之前连接的都是远程redis和mysql,发行并发量很低,后续都直接用本地了
 可以看出并发量很大,达到了455

 redis中库存也成功清0
优化
1. 递归改成循环
@Service
public class StockService {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void deduct() {
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){
            // 如果没抢到,等50ms后,接着抢
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        // 如果抢到了,直接执行
        try{
            // 1。 查询库存
            String stockStr = redisTemplate.opsForValue().get("stock");
            if (!StringUtil.isNullOrEmpty(stockStr)) {
                int stock = Integer.parseInt(stockStr);
                // 2。 判断条件是否满足
                if (stock > 0) {
                    // 3 更新redis
                    redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
                }
            }
        }finally {
            redisTemplate.delete("lock");
        }
    }
2. 防止死锁
若抢到钥匙的线程执行过程中,redis客户端程序突然宕机,那即使重启,该钥匙也永远无法自动归还了,所以设置个过期时间
redisTemplate.opsForValue()
.setIfAbsent("lock", "111",3,TimeUnit.SECONDS)
3. 防误删
利用一个唯一标识,比如线程号/uuid,先判断是否是自己的再删除
String uuid = UUID.randomUUID().toString();
redisTemplate.opsForValue()
.setIfAbsent("lock", uuid,3,TimeUnit.SECONDS)
然后再删除的时候进行判断
// 先判断是否自己的锁,再解锁(防误删)
            if(StrUtil.equals(redisTemplate.opsForValue().get("lock"),uuid)){
                redisTemplate.delete("lock");
            }
但该删除操作并不能保证判断再删除是原子性的,在高并发情况下,
 A判断成功后,属于A的key刚好失效,
 然后B拿到了锁,最后A 删掉的是B的锁。
4. LUA脚本 保证原子性
redis单线程,执行指令遵循one-by-one。
 而LUA脚本一次性发送多个指令给redis,保证原子性。
1) EVAL基本使用
EVAL script numkeys key [key …] arg [arg …]
 script: lua脚本字符串
 numkeys: key列表的元素数量
 key列表:以空格分割 KEYS(index从1开始)
 arg列表:以空格分割 ARGV(index从1开始)
- 输出的不是print,而是return返回值

-  全局变量 a = 5 (redis不允许定义全局变量) 
-  局部变量 local a = 5 
  
-  分支控制 
 if 条件 then 代码块 elseif 条件 then 代码块 end
  
-  传递参数 KEYS[] ARGV[] 
  
-  LUA来调用redis的常见命令 get 
  
 与redis的执行命令顺序一致,如果是set
  
 非常简单,弄懂了后就可以开始使用lua脚本
// 判断是否是自己的锁,如果是自己的锁,则执行删除操作
if redis.call(‘get’, ‘lock’) == uuid
then return redis.call(‘del’,‘lock’)
else return 0
end
key: lock
argv: uuid
整理成一句lua脚本语句
EVAL " if redis.call(‘get’, KEYS[1]) == ARGV[1] then return redis.call(‘del’,KEYS[1]) else return 0 end" 1 ‘lock’ ‘1234’
简单测试一下
 
 修改下,看不是自己的lock能不能删除
 
 不是自己的lock,则不删除,ok
2)代码实现
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                    "then return redis.call('del',KEYS[1]) " +
                    "else return 0 " +
                    "end\"";
// 先判断是否自己的锁,再解锁(防误删)
redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);
注意execute参数,
 new DefaultRedisScript<>(script, Boolean.class) 脚本和运行返回类型
 Arrays.asList("lock") keys列表,所有keys放在列表中
 uuid argv参数,后续参数直接,叠加
3) 测试

 并发可达475
 
 redis也正常清零了,成功
5 分布式可重入锁: lua脚本 + hash
可参考ReentrantLock可重入锁解析锁实现分布式可重入锁
这个时候就考虑用redis的hash结构,因为你不仅只有key-value,还有个state属性,即加锁次数
redis的hash数据结构
1) hset
hset key field value

 
 hash可以简单看成一个大Map中套着个小Map <key , <key1,value> >
2) hexists
作用:测试给定key下的field是否存在。
格式:hexists key field

3) hincrby
作用:增减数值
格式:hincrby key field increment

把age+1 变成了12
加锁优化为可重入
-  
  - 判断锁是否存在(exists),若不存在则直接获取锁 hset key field value
 
-  
  - 若锁存在则判断是否是自己的锁(hexists),若是则重入:hincrby key field increment
 
-  
  - 否则重试,递归/循环
 
1)写成LUA脚本
 if redis.call('exists','lock')==0 
 then redis.call('hset','lock',uuid,1) 
	  redis.call('expire','lock',30)
 	  return 1
 elseif redis.call('hexists','lock',uuid)==0
 then redis.call('hincrby','lock',uuid,1) 
 	  redis.call('expire','lock',30)
 	  return 1
 else return 0
 end
这里可以再优化下
 
hincrby若没有key field会自动生成,此处可替换hset
 if redis.call('exists','lock')==0  or redis.call('hexists','lock',uuid)==0
 then redis.call('hincrby','lock',uuid,1) 
 	  redis.call('expire','lock',30)
 	  return 1
 else return 0
 end
再将KEYS,ARGV加入后拼凑成一句语句
if redis.call('exists',KEYS[1])==0  or redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end
2) 测试
- 先简单加锁 参数 1 “lock” “1234567” 30
  

 加锁成功
- 重入加锁 参数 1 “lock” “1234567” 30
  

- value变成了2, 而且失效时间成功更新了
- 先参数 1 “lock” “123” 30 再执行参数 1 “lock” “1234567” 30

 
 后一条加锁失败,直至行了123那条。测试成功。
解锁优化为可重入
1) 编写LUA脚本
- 判断自己的锁是否存在(hexists),不存在则返回nil
- 若自己的锁存在,则减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del)并返回1
- 不为0,返回0
LUA脚本
 if redis.call('hexists','lock',uuid) == 0
 then return nil
 elseif redis.call('hincrby', 'lock',uuid,-1)==0 
 then return redis.call('del','lock')
 else return 0
 end
把参数和key填充进去后,优化成一句语句
 if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincrby', KEYS[1],ARGV[1],-1)==0 then return redis.call('del',KEYS[1]) else return 0 end
2)测试
-  先参数 1 “lock” “1234567” 
  
 当前没有该锁,返回nil
-  用之前脚本加锁 
  
 加锁一次,解锁一次,返回1,解锁第二次,返回nil
-  多次加锁,多次解锁 
  
 两次加锁成功,返回1,解锁第一次返回0,第二次返回1。
代码改造
1) 代码较复杂,将lock类单独拿出来定义使用
DistributedRedisLock 参照ReentrantLock实现Lock接口
public class DistributedRedisLock implements Lock {
    @Override
    public void lock() {
        
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
    }
    @Override
    public boolean tryLock() {
        return false;
    }
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }
    @Override
    public void unlock() {
    }
    @Override
    public Condition newCondition() {
        return null;
    }
}
2) 工厂模式 DistributedLockClent
由于后续可能还有其他分布式锁类,直接用工厂模式实现。
 将StringRedisTemplate传递,因为只有DistributedLockClent添加了@Component供其他类注入
@Component
public class DistributedLockClient {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public DistributedRedisLock getRedisLock(String key){
        return new DistributedRedisLock(redisTemplate,key);
    }
}
3)DistributedRedisLock
-加锁
	@Override
    public void lock() {
        tryLock();
    }
 	@Override
    public boolean tryLock() {
        try {
            return tryLock(-1,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
	
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }
层层调用后修改tryLock即可。
/**
     * 加锁方法
     * @param time the maximum time to wait for the lock
     * @param unit the time unit of the {@code time} argument
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // 如果不是默认调用的,那么自动计算出秒数
        if(time != -1){
            expire = unit.toSeconds(time);
        }
        // 如果没有锁加锁,如果是我的锁,重入 state+1
        String script = "if redis.call('exists',KEYS[1])==0  or redis.call('hexists',KEYS[1],ARGV[1])==1 " +
                "then" +
                "   redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                "   redis.call('expire',KEYS[1],ARGV[2]) " +
                "   return 1 " +
                "else " +
                "   return 0 " +
                "end";
        // 如果return 0 即抢锁失败,则循环抢
        while (Boolean.FALSE.equals(redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(key), uuid, String.valueOf(expire)))){
            Thread.sleep(50);
        }
        return true;
    }
- 解锁
 /**
     * 解锁方法
     */
    @Override
    public void unlock() {
        // 如果不是我的锁,return nil 是我的锁减1,建减后state为0,则全部解锁完成return 1 ,不为0, 则return 0
        String script = " if redis.call('hexists',KEYS[1],ARGV[1]) == 0 " +
                "then " +
                "   return nil " +
                "elseif redis.call('hincrby', KEYS[1],ARGV[1],-1)==0 " +
                "then " +
                "   return redis.call('del',KEYS[1]) " +
                "else return 0 " +
                "end";
        Long flag = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class),Arrays.asList(key),uuid);
        if(flag == null){
            throw new IllegalMonitorStateException("this lock does not belong to you");
        }
    }
由于返回值有nil , 1, 0 所有返回类型为 Long.class,分别对应null,1,0
4) StockService
@Service
public class StockService {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Autowired
    private DistributedLockClient distributedLockClient;
    public void deduct() {
        DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
        redisLock.lock();
        try{
            // 1。 查询库存
            String stockStr = redisTemplate.opsForValue().get("stock");
            if (!StringUtil.isNullOrEmpty(stockStr)) {
                int stock = Integer.parseInt(stockStr);
                // 2。 判断条件是否满足
                if (stock > 0) {
                    // 3 更新redis
                    redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
                }
            }
        }finally {
           redisLock.unlock();
        }
    }
}
测试

 并发达到了440
 
 库存也正常清空
uuid优化
目前是每个线程都生成一个uuid,在可重入可能会出现问题,
  public void test1(){
        DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
        redisLock.lock();
        test2();
        redisLock.unlock();
        
    }
    public void test2(){
        DistributedRedisLock redisLock = distributedLockClient.getRedisLock("lock");
        redisLock.lock();
        redisLock.unlock();
    }
优化为线程id+uuid,且一个工厂类生成一个uuid的方式
1) DistributedLockClient
@Component
public class DistributedLockClient {
    @Autowired
    private StringRedisTemplate redisTemplate;
    public DistributedLockClient() {
        this.uuid = UUID.randomUUID().toString();
    }
    private String uuid;
    public DistributedRedisLock getRedisLock(String key){
        return new DistributedRedisLock(redisTemplate,key,uuid);
    }
}
2) DistributedRedisLock
public String getId(){
        return uuid + ":" + Thread.currentThread().getId();
    }
把之前的uuid使用地方,替换成getId()即可。
测试后成功。
6. 自动续期
定时任务(时间驱动 Timer定时器) + LUA脚本
1) Timer定时器
public static void main(String[] args) {
        System.out.println("定时任务开始时间" + System.currentTimeMillis());
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("定时任务执行时间" + System.currentTimeMillis());
            }
        },2000,5000);
    }

 由于它可以手动取消,所以选择了它来使用
2) LUA脚本
- 判断自己的锁是否存在(hexists),若存在则重置过期时间
if redis.call('hexists','lock',uuid) == 1 
then return redis.call('expire','lock',30)
else return 0
end
整理后
if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then return redis.call('expire',KEYS[1],ARGV[2]) else return 0 end
3) DistributedRedisLock
 private void renewExpire(){
        // 如果锁存在则续期,不存在则不续期
        String script = "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 " +
                "then " +
                "   return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "   return 0 end";
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                // 如果锁还在,则自动续期
                if(Boolean.TRUE.equals(redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(key), uuid, String.valueOf(expire)))){
                    renewExpire();
                }
            }
        },expire*1000/3);
    }
同时,修改初始化的uuid,防止定时器内部的线程id不同。
public DistributedRedisLock(StringRedisTemplate redisTemplate, String key, String uuid) {
        this.redisTemplate = redisTemplate;
        this.key = key;
        this.uuid = uuid + ":" + Thread.currentThread().getId();
    }
4) 测试

 成功,但并发较低100,很耗线程。
总结
加锁
- setnx: 独占排他 死锁、不可重入、原子性
- set k v ex 30 nx: 独占排他 死锁 不可重入
- hash + lua脚本: 可重入锁
- 判断锁是否占用,若没有占用则直接获取锁,并设置过期时间
- 若锁被占用,则判断是否当前线程占用,如果是则重入并重置过期时间
- 否则获取锁失败,在代码中重试
- Timer定时器+lua脚本:实现锁的自动续期
- 判断是否是自己的锁,若是则续期。
解锁
- del: 导致误删
- 先判断再删除,同时保持原子性: lua脚本
- hash+lua脚本:可重入
- 判断当前线程的锁是否存在,不存在则返回nil,抛出异常
- 存在则直接减1,判断减1后的值是否为0,为0则释放锁(del),并返回1
- 不为0,则返回0



















