title: redis解决常见的秒杀问题
date: 2025-03-07 14:24:13
tags: redis
categories: redis的应用
秒杀问题
每个店铺都可以发布优惠券,保存到 tb_voucher 表中;当用户抢购时,生成订单并保存到 tb_voucher_order 表中。
订单表如果使用数据库自增 ID,会存在以下问题:
- ID 的规律太明显,容易暴露信息。
- 单表数据量的限制,订单过多时单表很难存储得下。数据量过大后需要拆库拆表,但拆分表了之后,各表从逻辑上是同一张表,所以 id 不能一样, 于是需要保证 ID 的唯一性。
全局唯一ID
全局唯一 ID 的特点
- 唯一性:Redis 独立于数据库之外,不论有多少个数据库、多少张表,访问 Redis 获取到的 ID 可以保证唯一。
- 高可用:Redis 高可用(集群等方案)。
- 高性能:Redis 速度很快。
- 递增性:例如 String 的 INCR 命令,可以保证递增。
- 安全性:为了增加 ID 的安全性,在使用 Redis 自增数值的基础上,在拼接一些其他信息。
全局唯一 ID 的组成(存储数值类型占用空间更小,使用 long 存储,8 byte,64 bit)
-
符号位:1 bit,永远为 0,代表 ID 是正数。
-
时间戳:31 bit,以秒为单位,可以使用 69 年。
-
序列号:32 bit,当前时间戳对应的数量,也就是每秒可以对应 2^32 个不同的 ID。
Redis ID 自增策略:通过设置每天存入一个 Key,方便统计订单数量;ID 构造为 时间戳 + 计数器。
@Component
public class RedisIdWorker {
/**
* 指定时间戳(2023年1月1日 0:0:00) LocalDateTime.of(2023, 1, 1, 0, 0, 0).toEpochSecond(ZoneOffset.UTC)
*/
private static final long BEGIN_TIMESTAMP_2023 = 1672531200L;
/**
* 序列号位数
*/
private static final int BIT_COUNT = 32;
private final StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix) {
// 1. 时间戳
long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP_2023;
// 2. 生成序列号:自增 1,Key 不存在会自动创建一个 Key。(存储到 Redis 中的 Key 为 keyPrefix:date,Value 为自增的数量)
Long serialNumber = stringRedisTemplate.opsForValue().increment(keyPrefix + ":" + DateTimeFormatter.ofPattern("yyyy-MM-dd").format(LocalDate.now()));
// 3. 时间戳左移 32 位,序列号与右边的 32 个 0 进行与运算
return timestamp << BIT_COUNT | serialNumber;
}
}
测试(300个线程生成共3w个id)
@Resource
private RedisIdWorker redisIdWorker;
public static final ExecutorService ES = Executors.newFixedThreadPool(500);
@Test
void testGloballyUniqueID() throws Exception {
// 程序是异步的,分线程全部走完之后主线程再走,使用 CountDownLatch;否则异步程序没有执行完时主线程就已经执行完了
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long globallyUniqueID = redisIdWorker.nextId("sun");
System.out.println("globallyUniqueID = " + globallyUniqueID);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
ES.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("Execution Time: " + (end - begin));
}
添加优惠卷
格式类似这种逻辑太简单了略
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五均可使用",
"rules":"全场通用\n无需预约\n可无限叠加\n不兑现、不找零\n仅限堂食",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2022-11-13T10:09:17",
"endTime":"2022-11-13T22:10:17"
}
秒杀下单功能
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1.查询优惠卷
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始,是否结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已结束!");
}
//3.判断库存是否充足
if(voucher.getStock()<=0){
return Result.fail("优惠券库存不足!");
}
//4.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).update();
//5.创建订单
if(!success){
return Result.fail("优惠券库存不足!");
}
//6.返回订单id
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//7.订单写入数据库
save(voucherOrder);
//8.返回订单Id
return Result.ok(orderId);
}
超卖问题
假设库存为 1,有线程1、2、3,时刻 t1、t2、t3、t4。
- t1:线程1 查询库存,库存为 1;
- t2:线程2、线程 3 查询库存,库存为 1;
- t3:线程1 下单,库存扣减为 0。
- t4:线程2 和 线程3 下单,库存扣减为 -2。
具体图示:
解决超卖问题
悲观锁
太简单了直接加锁保证操作数据是原子操作要串行执行
乐观锁
版本号法:
一般是在数据库表中加上一个 version 字段表示 数据被修改的次数。数据被修改时 version 值加 1。
-
线程 A 读取数据,同时读取到 version 值。
-
提交更新时,若刚才读到的 version 值未发生变化:则提交更新并且 version 值加 1。
-
提交更新时,若刚才读到的 version 值发生了变化:放弃更新,并通过报错、自旋重试等方式进行下一步处理。
CAS法(简单来说就是直接拿库存当版本号):
CAS 操作需要输入两个数值,一个旧值(操作前的值)和一个新值,操作时先比较下在旧值有没有发生变化,若未发生变化才交换成新值,发生了变化则不交换。
CAS 是原子操作,多线程并发使用 CAS 更新数据时,可以不使用锁。原子操作是最小的不可拆分的操作,操作一旦开始,不能被打断,直到操作完成。也就是多个线程对同一块内存的操作是串行的。
一人一单问题
一人一单逻辑:
- 发送下单请求,提交优惠券 ID。
- 下单前需要判断:秒杀是否开始或结束、库存是否充足。
- 库存充足:根据优惠券 ID 和用户 ID 查询订单,判断该用户是否购买过该优惠券。
- 该用户对该优惠券的订单不存在时,扣减库存、创建订单、返回订单 ID。
解决并发安全问题
- 单人下单(一个用户),高并发的情况下:该用户的 10 个线程同时执行到 查询该用户 ID 和秒杀券对应的订单数量,10 个线程查询到的值都为 0,即未下单。于是会出现一个用户下 10 单的情况。
- **此处仍需加锁,乐观锁适合更新操作,插入操作需要选择悲观锁。**若直接在方法上添加 synchronized 关键字,会让锁的范围(粒度)过大,导致性能较差。因此,采用 一个用户一把锁 的方式。
问题:能否用乐观锁执行?
不能,原因是乐观锁只能操作(修改)单个变量,而创建订单需要操作数据库(难以跟踪状态)
@Override
public CommonResult<Long> seckillVoucher(Long voucherId) {
// 判断秒杀是否开始或结束、库存是否充足。
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
ThrowUtils.throwIf(seckillVoucher == null, ErrorCode.NOT_FOUND_ERROR);
LocalDateTime now = LocalDateTime.now();
ThrowUtils.throwIf(now.isBefore(seckillVoucher.getBeginTime()), ErrorCode.OPERATION_ERROR, "秒杀尚未开始");
ThrowUtils.throwIf(now.isAfter(seckillVoucher.getEndTime()), ErrorCode.OPERATION_ERROR, "秒杀已经结束");
ThrowUtils.throwIf(seckillVoucher.getStock() < 1, ErrorCode.OPERATION_ERROR, "库存不足");
// 下单
return this.createVoucherOrder(voucherId);
}
/**
* 下单(超卖 - CAS、一人一单 - synchronized)
*/
@Override
@Transactional
public CommonResult<Long> createVoucherOrder(Long voucherId) {
// 1. 判断当前用户是否下过单
Long userId = UserHolder.getUser().getId();
Integer count = this.lambdaQuery()
.eq(VoucherOrder::getVoucherId, voucherId)
.eq(VoucherOrder::getUserId, userId)
.count();
ThrowUtils.throwIf(count > 0, ErrorCode.OPERATION_ERROR, "禁止重复下单");
// 2. 扣减库存
boolean result = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "下单失败");
// 3. 下单
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setUserId(userId);
voucherOrder.setId(redisIdWorker.nextId("seckillVoucherOrder"));
voucherOrder.setVoucherId(voucherId);
result = this.save(voucherOrder);
ThrowUtils.throwIf(!result, ErrorCode.OPERATION_ERROR, "下单失败");
return CommonResult.success(voucherOrder.getId());
}
集群环境下的并发问题
分布式锁-原理
不去使用jvm内部的锁监视器,我们要在外部开一个锁监视器,让它监视所有的线程
常见的分布式锁
MySQL
:MySQL 本身带有锁机制,但是由于 MySQL 性能一般,所以采用分布式锁的情况下,使用 MySQL 作为分布式锁比较少见。
Redis
:Redis 作为分布式锁比较常见,利用 setnx 方法,如果 Key 插入成功,则表示获取到锁,插入失败则表示无法获取到锁。
Zookeeper
:Zookeeper 也是企业级开发中比较好的一个实现分布式锁的方案。
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用 MySQL 本身的互斥锁机制 | 利用 setnx 互斥命令 | 利用节点的唯一性和有序性 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开链接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开链接自动释放 |
# 添加锁(NX 互斥、EX 设置 TTL 时间)
SET lock thread1 NX EX 10
# 手动释放锁
DEL lock
public interface DistributedLock {
/**
* 获取锁(只有一个线程能够获取到锁)
* @param timeout 锁的超时时间,过期后自动释放
* @return true 代表获取锁成功;false 代表获取锁失败
*/
boolean tryLock(long timeout);
/**
* 释放锁
*/
void unlock();
}
public class SimpleDistributedLock4Redis implements DistributedLock {
private static final String KEY_PREFIX = "lock:";
private final String name;
private final StringRedisTemplate stringRedisTemplate;
public SimpleDistributedLockBased4Redis(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean tryLock(long timeout) {
String threadId = Thread.currentThread().getId().toString();
Boolean result = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeout, TimeUnit.SECONDS);
// result 是 Boolean 类型,直接返回存在自动拆箱,为防止空指针不直接返回
return Boolean.TRUE.equals(result);
}
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
/**
* VERSION3.0 - 秒杀下单优惠券(通过分布式锁解决一人一单问题)
*/
@Override
public CommonResult<Long> seckillVoucher(Long voucherId) {
// 判断秒杀是否开始或结束、库存是否充足。
...
// 下单
SimpleDistributedLock4Redis lock = new SimpleDistributedLock4Redis("order:" + UserHolder.getUser().getId(), stringRedisTemplate);
boolean tryLock = lock.tryLock(TTL_TWO);
ThrowUtils.throwIf(!tryLock, ErrorCode.OPERATION_ERROR, "禁止重复下单");
try {
VoucherOrderService voucherOrderService = (VoucherOrderService) AopContext.currentProxy();
return voucherOrderService.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
}
误删问题
# 线程 1 获取到锁后执行业务,碰到了业务阻塞。
setnx lock:order:1 thread01
# 业务阻塞的时间超过了该锁的 TTL 时间,触发锁的超时释放。超时释放后,线程 2 获取到锁并执行业务。
setnx lock:order:1 thread02
# 线程 2 执行业务的过程中,线程 1 的业务执行完毕并且释放锁,但是释放的是线程 2 获取到的锁。(线程 2:你 TM 放我锁是吧!)
del lock:order:1
# 线程 3 获取到锁(此时线程 2 和 3 并行执行业务)
setnx lock:order:1 thread03
解决方案:在线程释放锁时,判断当前这把锁是否属于自己,如果不属于自己,就不会进行锁的释放(删除)。
# 线程 1 获取到锁后执行业务,碰到了业务阻塞。
setnx lock:order:1 thread01
# 业务阻塞的时间超过了该锁的 TTL 时间,触发锁的超时释放。超时释放后,线程 2 获取到锁并执行业务。
setnx lock:order:1 thread02
# 线程 2 执行业务的过程中,线程 1 的业务执行完毕并且释放锁。但是线程 1 需要判断这把锁是否属于自己,不属于自己就不会释放锁。
# 于是线程 2 一直持有这把锁直到业务执行结束后才会释放,并且在释放时也需要判断当前要释放的锁是否属于自己。
del lock:order:1
# 线程 3 获取到锁并执行业务
setnx lock:order:1 thread03
基于 Redis 的分布式锁的实现(解决误删问题)
-
相较于最开始分布式锁的实现,只需要增加一个功能:释放锁时需要判断当前锁是否属于自己。(而集群环境下不同 JVM 中的线程 ID 可能相同,增加一个 UUID 区分不同 JVM)
-
因此通过分布式锁存入 Redis 中的线程标识包括:UUID (服务器id)+ 线程 ID(线程id)。UUID 用于区分不同服务器中线程 ID 相同的线程,线程 ID 用于区分相同服务器的不同线程。
public class SimpleDistributedLockBasedOnRedis implements DistributedLock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleDistributedLockBasedOnRedis(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; // ID_PREFIX 在当前 JVM 中是不变的,主要用于区分不同 JVM private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; /** * 获取锁 */ @Override public boolean tryLock(long timeoutSeconds) { // UUID 用于区分不同服务器中线程 ID 相同的线程;线程 ID 用于区分同一个服务器中的线程。 String threadIdentifier = ID_PREFIX + Thread.currentThread().getId(); Boolean isSucceeded = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadIdentifier, timeoutSeconds, TimeUnit.SECONDS); return Boolean.TRUE.equals(isSucceeded); } /** * 释放锁(释放锁前通过判断 Redis 中的线程标识与当前线程的线程标识是否一致,解决误删问题) */ @Override public void unlock() { // UUID 用于区分不同服务器中线程 ID 相同的线程;线程 ID 用于区分同一个服务器中的线程。 String threadIdentifier = THREAD_PREFIX + Thread.currentThread().getId(); String threadIdentifierFromRedis = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 比较 Redis 中的线程标识与当前的线程标识是否一致 if (!StrUtil.equals(threadIdentifier, threadIdentifierFromRedis)) { throw new BusinessException(ErrorCode.OPERATION_ERROR, "释放锁失败"); } // 释放锁标识 stringRedisTemplate.delete(KEY_PREFIX + name); } }
用Lua脚本解决原子性问题
分布式锁的原子性问题
-
线程 1 获取到锁并执行完业务,判断锁标识一致后释放锁,释放锁的过程中阻塞,导致锁没有释放成功,并且阻塞的时间超过了锁的 TTL 释放,导致锁自动释放。
-
此时线程 2 获取到锁,执行业务;在线程 2 执行业务的过程中,线程 1 完成释放锁操作。
-
之后,线程 3 获取到锁,执行业务,又一次导致此时有两个线程同时在并行执行业务。
因此,需要保证 unlock()
方法的原子性,即判断线程标识的一致性和释放锁这两个操作的原子性。
Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保 Redis 多条命令执行时的原子性。
unlock操作
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{//写成静态代码块,类加载就可以完成初始定义,就不用每次释放锁都去加载这个,性能提高咯
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));//设置脚本位置
UNLOCK_SCRIPT.setResultType(Long.class);
}
public void unlock(){
//调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}
Lua脚本
-- 锁的key
-- local key = KEYS[1]
-- 当前线程标识
-- local threadId = ARGV[1]
-- 获取锁中的线程标识
if(redis.call('get',KEYS[1]) == ARGV[1]) then return redis.call('del',KEYS[1])
end return 0