Redis高并发分布式锁实战
1.分布式场景下的synchronized失效的问题–用redis实现分布式锁
synchronized是通过monitor实现的jvm级别的锁,如果是分布式系统,跑在不同的虚拟机上的tomcat上,会导致synchronized无法锁住对象 ----------- 需要分布式锁 redis
SET、SETEX、SETNX
SET key value
 含义:
     SET KEY value  V-K
    相同的K 后写的覆盖先写的

SETEX key seconds value
 该命令相当于将下面两行操作合并为一个原子操作
SET key value
 EXPIRE key seconds # 设置生存时间
 含义(setex = set expire):
          SET KEY value  V-K 设置生命周期 
    相同的K 后写的覆盖先写的


SETNX key value
 含义(setnx = SET if Not eXists):
       SET KEY value  V-K  ,key 不存在返回1 表示成功,key存在返回0
       SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
返回值:
       设置成功,返回 1 。
       设置失败,返回 0 。

2.redis实现分布式锁
@RequestMapping("/redis-001")
public String redis001() {
    String key = "redis-001";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
    if (!result) {
        return "error_code";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        int realStock = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", realStock + "");
        System.out.println("扣减成功,剩余库存:" + realStock);
    } else {
        System.out.println("扣减失败,库存不足");
    }
    stringRedisTemplate.delete(key);
    return "end";
}
在 stringRedisTemplate.delete(key) 释放锁之前会有业务代码块,若出现异常抛出,则不能执行关锁的代码块
@RequestMapping("/redis-001")
public String redis001() {
    String key = "redis-001";
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
    if (!result) {
        return "error_code";
    }
    try {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        } else {
            System.out.println("扣减失败,库存不足");
        }
    } finally {
        stringRedisTemplate.delete(key);
    }
    return "end";
}
在程序执行的任意时刻都有可能应为不可抗力因素突然终止,重启、宕机导致不能执行到finally代码块,所以必须要设置超时时间
    @RequestMapping("/redis-001")
    public String redis001() {
        String key = "redis-001";
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
        stringRedisTemplate.expire(key,2000 ,TimeUnit.MILLISECONDS);
        if (!result) {
            return "error_code";
        }
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            stringRedisTemplate.delete(key);
        }
        return "end";
    }
redis设置的时候需要保证原子性
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
stringRedisTemplate.expire(key,2000 ,TimeUnit.MILLISECONDS);
解决方案
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, clintId,2000 ,TimeUnit.MILLISECONDS);
若T1的锁设置失效时间20s,但是T1执行20s没有完成,此时T2可以获得锁,T1执行会在finally代码块中释放T2加的锁
    @RequestMapping("/redis-001")
    public String redis001() {
        String key = "redis-001";
        //不满足原子性
//        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001");
//        stringRedisTemplate.expire(key,2000 ,TimeUnit.MILLISECONDS);
        String clintId = UUID.randomUUID().toString();
        //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "redis-001",2000 ,TimeUnit.MILLISECONDS);
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, clintId,2000 ,TimeUnit.MILLISECONDS);
        if (!result) {
            return "error_code";
        }
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            if (clintId.equals(stringRedisTemplate.opsForValue().get(key))){
                 stringRedisTemplate.delete(key);
            }
        }
        return "end";
    }
finally {
            if (clintId.equals(stringRedisTemplate.opsForValue().get(key))){
                 stringRedisTemplate.delete(key);
            }
        }
上述代码来确定是否是自己的锁,是没有原子性的,使用redisson(lua)来解决这个问题
Redisson

Redis 自 2.6 版本开始支持 Lua 脚本,是将所有操作打包成原子操作的一种机制。使用 Lua 脚本可以对 Redis 数据库进行复杂的操作,比如多个命令组合执行、避免分布式事务中的竞态条件等。
Lua 脚本在 Redis 中的原理是:将脚本发送到 Redis 服务器时,Redis 会先对脚本进行语法检查和编译,然后将编译后的字节码缓存起来并返回一个 SHA1 校验和。之后客户端每次需要执行这个脚本时,只需要将 SHA1 校验和发送给 Redis 服务器,Redis 通过校验和即可直接获取缓存中的字节码,避免了每次解析和编译 Lua 脚本的开销。
Lua 脚本的好处有以下几点:
-  原子性:Lua 脚本是 Redis 支持的最完整的事务形式,因为它们在 Redis 服务器上作为一个单独的脚本条目执行,因此能够保证所有操作的原子性。 
-  灵活性:Lua 脚本方便对于 Redis 数据库进行复杂的操作,比如批量操作等。 
-  性能:由于 Redis 会对 Lua 脚本进行预编译并缓存字节码,因此当相同的脚本被多次执行时,可以避免每次解析和编译脚本带来的开销。而且,Lua 脚本在 Redis 服务器中以单线程运行,相比于多线程,这样可以减少线程切换、锁等开销。  
总之,Redis Lua 脚本具有良好的性能、灵活性和原子性,使得 Redis 支持更复杂、更安全地操作数据,提高了 Redis 在实际应用中的可靠性和稳定性。
Lua脚本语法


Redisson.lock()

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
getName() //RLock redissonLock = redisson.getLock(lockKey);
internalLockLeaseTime //internalLockLeaseTime = unit.toMillis(leaseTime);->
          RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN); //getLockWatchdogTimeout() 默认30s 
 getLockName(threadId)  //final UUID id + threadId
  
设置锁成功如何执行看门狗机制实现续命
 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                Long ttlRemaining = future.getNow(); //成功是nil 即null
                // lock acquired
                if (ttlRemaining == null) {
                  	//成功一定进入的代码块
                    scheduleExpirationRenewal(threadId);
                }
            }
        });


没有获得锁的线程自旋等待,间歇性尝试加锁

getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); //是阻塞等待不会占用资源
ttl时间内如果当前获得锁的线程执行完成了怎么办,订阅模式

订阅的内容将在删除的时候更新


unlock使用了lua代码保证了原子操作


















