在分布式系统中,确保数据一致性和操作的正确执行是至关重要的。PmHub项目中,通过集成Redis分布式锁来保障流程状态更新,这是一个非常关键的技术点,以下将详细介绍其原理、实现。
1 本地锁的问题
1.1 常见的本地锁
在Java中,常见的本地锁有两种,分别是synchronized锁和Lock锁。
在单机环境下,synchronized锁是Java提供的一种内置锁,在单个JVM进程中提供线程之间的锁定机制,可控制多线程并发。
1.2 为什么单机锁锁不住
然而,在分布式系统中,当流程服务部署了多个实例且位于不同服务器时,synchronized锁不再适用。
- 假如现在有多个流程服务实例,那么synchronized这种单机锁的锁对象是字节码文件(或者是Java对象实例),那么其实都会在每一个流程服务中存在一个锁对象。
- 现在假如浏览器往网关发起了若干请求,由于网关中是动态路由,那么就会把这些请求路由给各个流程服务。又由于商品服务中对访问数据库进行了加锁,锁对象是当前类的字节码文件,在每一个流程服务中,都会有这个类的字节码文件存在。
- 也就说,本地锁,在这种场景下,只能保证每一个流程服务同时只有一个线程访问数据库,不能保证数据库在某一个时刻,只有一个线程访问。
2 分布式锁的概念
基于本地锁在分布式环境中的问题,需要一种能在分布式集群环境下的锁机制。分布式锁是控制分布式系统不同进程访问共享资源的一种锁机制,能确保同一时刻只有一个访问可以调用,避免多个调用者竞争调用和数据不一致问题。
所谓分布式锁,其实就是锁对象不在服务实例中,而是在服务实例外部。
3 分布式锁的特性
- 互斥性:同一时刻只能一个节点服务拥有该锁,不同节点之间具有互斥性。
- 超时机制:为防止锁变成死锁,需要设置锁的超时时间。正常情况下,请求获取锁后处理任务并释放锁;若发生服务异常或网络异常导致锁无法释放,过了超时时间后,锁自动释放,其他请求能正常获取锁。
- 自动续期:锁设置了超时机制后,若持有锁的节点处理任务时间过长超过了超时时间,会发生线程未处理完任务锁就被释放的情况,其他线程就能获取到该锁,导致多个节点同时访问共享资源。因此,需要开启一个监听线程,定时监听任务,若任务线程存活则延长超时时间;当任务完成或发生异常则不继续延长超时时间。
4 分布式锁的实现方式
4.1 分布式锁的实现
分布式锁主要有三种实现方式:数据库、Zookeeper、Redis。其中,Redis实现分布式锁性能最高。Redis实现分布式锁有两种方式,一种是利用Redis的SetNX,但存在死锁问题;线上多采用Redission实现分布式锁。
4.2 Radission分布式锁
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid),不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson基于netty通信框架实现,支持非阻塞通信,性能优于Jedis。
Redisson分布式锁具有四层保护:防死锁、防误删、可重入、自动续期。同时,Redisson实现Redis分布式锁,支持单机和集群模式。
5 PmHub项目实战
5.1 业务流程
PmHub分布式锁业务流程如下:
- 项目服务操作
- 首先由用户发起操作,在项目服务中依次进行“新建项目”和“新建任务” 。
- 接着进行“任务审批设置” ,在此环节,若涉及更新设置操作,需等待分布式锁释放才能执行。
- 流程服务操作
- 当项目服务完成“任务审批设置”后,流程服务开始介入,先执行“更新审批流程” 。
- 然后依次进行“流程状态通知”和“流程状态回调” ,整个过程中分布式锁起到协调和控制资源访问的作用,避免并发操作带来的数据不一致等问题 。
5.2 实现步骤
5.2.1 添加依赖
在PmHub项目中,添加依赖pmhub-base/pmhub-base-security/pom.xml
<!-- Redisson 分布式锁功能 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.2</version>
</dependency>
5.2.2 添加配置
并在nacos的pmhub-workflow中添加配置,配置Redisson单机模式。
# Redisson 分布式锁配置
redisson:
codec: org.redisson.codec.JsonJacksonCodec
threads: 4
netty:
threads: 4
single-server-config:
address: "redis://localhost:6379"
password: null
database: 0
5.2.3 读取配置
添加RedissionConfig配置类来读取配置文件:com.laigeoffer.pmhub.base.security.config.RedissonConfig
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private int redisPort;
@Value("${spring.redis.password:}") // 如果没有密码,默认值为空
private String redisPassword;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + redisHost + ":" + redisPort)
.setPassword(redisPassword.isEmpty() ? null : redisPassword)
.setDatabase(0);
return Redisson.create(config);
}
}
5.2.4 定义ILock锁对象
com.laigeoffer.pmhub.base.security.pojo.ILock
@AllArgsConstructor
public class ILock implements AutoCloseable {
/**
* 持有的锁对象
*/
@Getter
private Object lock;
/**
* 分布式锁接口
*/
@Getter
private IDistributedLock distributedLock;
@Override
public void close() throws Exception {
if (Objects.nonNull(lock)) {
distributedLock.unLock(lock);
}
}
}
5.2.5 定义IDistributedLock分布式锁接口及其实现类
com.laigeoffer.pmhub.base.security.service.redisson.IDistributedLock
public interface IDistributedLock {
/**
* 获取锁,默认30秒失效,失败一直等待直到获取锁
*
* @param key 锁的key
* @return 锁对象
*/
ILock lock(String key);
/**
* 获取锁,失败一直等待直到获取锁
*
* @param key 锁的key
* @param lockTime 加锁的时间,超过这个时间后锁便自动解锁; 如果lockTime为-1,则保持锁定直到显式解锁
* @param unit {@code lockTime} 参数的时间单位
* @param fair 是否公平锁
* @return 锁对象
*/
ILock lock(String key, long lockTime, TimeUnit unit, boolean fair);
/**
* 尝试获取锁,30秒获取不到超时异常,锁默认30秒失效
*
* @param key 锁的key
* @param tryTime 获取锁的最大尝试时间
* @return
* @throws Exception
*/
ILock tryLock(String key, long tryTime) throws Exception;
/**
* 尝试获取锁,获取不到超时异常
*
* @param key 锁的key
* @param tryTime 获取锁的最大尝试时间
* @param lockTime 加锁的时间
* @param unit {@code tryTime @code lockTime} 参数的时间单位
* @param fair 是否公平锁
* @return
* @throws Exception
*/
ILock tryLock(String key, long tryTime, long lockTime, TimeUnit unit, boolean fair) throws Exception;
/**
* 解锁
*
* @param lock
* @throws Exception
*/
void unLock(Object lock);
/**
* 释放锁
*
* @param lock
* @throws Exception
*/
default void unLock(ILock lock) {
if (lock != null) {
unLock(lock.getLock());
}
}
}
com.laigeoffer.pmhub.base.security.service.redisson.RedissonDistributedLock
@Slf4j
@Component
public class RedissonDistributedLock implements IDistributedLock {
@Resource
private RedissonClient redissonClient;
/**
* 统一前缀
*/
@Value("${redisson.lock.prefix:bi:distributed:lock}")
private String prefix;
@Override
public ILock lock(String key) {
return this.lock(key, 0L, TimeUnit.SECONDS, false);
}
@Override
public ILock lock(String key, long lockTime, TimeUnit unit, boolean fair) {
RLock lock = getLock(key, fair);
// 获取锁,失败一直等待,直到获取锁,不支持自动续期
if (lockTime > 0L) {
lock.lock(lockTime, unit);
} else {
// 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
lock.lock();
}
return new ILock(lock, this);
}
@Override
public ILock tryLock(String key, long tryTime) throws Exception {
return this.tryLock(key, tryTime, 0L, TimeUnit.SECONDS, false);
}
@Override
public ILock tryLock(String key, long tryTime, long lockTime, TimeUnit unit, boolean fair)
throws Exception {
RLock lock = getLock(key, fair);
boolean lockAcquired;
// 尝试获取锁,获取不到超时异常,不支持自动续期
if (lockTime > 0L) {
lockAcquired = lock.tryLock(tryTime, lockTime, unit);
} else {
// 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s
lockAcquired = lock.tryLock(tryTime, unit);
}
if (lockAcquired) {
return new ILock(lock, this);
}
return null;
}
/**
* 获取锁
*
* @param key
* @param fair
* @return
*/
private RLock getLock(String key, boolean fair) {
RLock lock;
String lockKey = prefix + ":" + key;
if (fair) {
// 获取公平锁
lock = redissonClient.getFairLock(lockKey);
} else {
// 获取普通锁
lock = redissonClient.getLock(lockKey);
}
return lock;
}
@Override
public void unLock(Object lock) {
if (!(lock instanceof RLock)) {
throw new IllegalArgumentException("Invalid lock object");
}
RLock rLock = (RLock) lock;
if (rLock.isLocked()) {
try {
rLock.unlock();
} catch (IllegalMonitorStateException e) {
log.error("释放分布式锁异常", e);
}
}
}
}
5.2.6 定义DistributedLock注解
com.laigeoffer.pmhub.base.security.annotation.DistributedLock
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
/**
* 保证业务接口的key的唯一性,否则失去了分布式锁的意义 锁key
* 支持使用spEl表达式
*/
String key();
/**
* 保证业务接口的key的唯一性,否则失去了分布式锁的意义 锁key 前缀
*/
String keyPrefix() default "";
/**
* 是否在等待时间内获取锁,如果在等待时间内无法获取到锁,则返回失败
*/
boolean tryLok() default false;
/**
* 获取锁的最大尝试时间 ,会尝试tryTime时间获取锁,在该时间内获取成功则返回,否则抛出获取锁超时异常,tryLok=true时,该值必须大于0。
*
*/
long tryTime() default 0;
/**
* 加锁的时间,超过这个时间后锁便自动解锁
*/
long lockTime() default 30;
/**
* tryTime 和 lockTime的时间单位
*/
TimeUnit unit() default TimeUnit.SECONDS;
/**
* 是否公平锁,false:非公平锁,true:公平锁
*/
boolean fair() default false;
}
5.2.7 AOP切面控制
com.laigeoffer.pmhub.base.security.aspect.DistributedLockAspect
@Aspect
@Slf4j
@Component
public class DistributedLockAspect {
@Resource
private IDistributedLock distributedLock;
/**
* SpEL表达式解析
*/
private SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
/**
* 用于获取方法参数名字
*/
private DefaultParameterNameDiscoverer nameDiscoverer = new DefaultParameterNameDiscoverer();
@Pointcut("@annotation(com.laigeoffer.pmhub.base.security.annotation.DistributedLock)")
public void distributorLock() {
}
@Around("distributorLock()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
// 获取DistributedLock
DistributedLock distributedLock = this.getDistributedLock(pjp);
// 获取 lockKey
String lockKey = this.getLockKey(pjp, distributedLock);
ILock lockObj = null;
try {
// 加锁,tryLok = true,并且tryTime > 0时,尝试获取锁,获取不到超时异常
if (distributedLock.tryLok()) {
if(distributedLock.tryTime() <= 0){
throw new UtilException("tryTime must be greater than 0");
}
lockObj = this.distributedLock.tryLock(lockKey, distributedLock.tryTime(), distributedLock.lockTime(), distributedLock.unit(), distributedLock.fair());
} else {
lockObj = this.distributedLock.lock(lockKey, distributedLock.lockTime(), distributedLock.unit(), distributedLock.fair());
}
if (Objects.isNull(lockObj)) {
throw new UtilException("Duplicate request for method still in process");
}
return pjp.proceed();
} catch (Exception e) {
throw e;
} finally {
// 解锁
this.unLock(lockObj);
}
}
/**
* @param pjp
* @return
* @throws NoSuchMethodException
*/
private DistributedLock getDistributedLock(ProceedingJoinPoint pjp) throws NoSuchMethodException {
String methodName = pjp.getSignature().getName();
Class clazz = pjp.getTarget().getClass();
Class<?>[] par = ((MethodSignature) pjp.getSignature()).getParameterTypes();
Method lockMethod = clazz.getMethod(methodName, par);
DistributedLock distributedLock = lockMethod.getAnnotation(DistributedLock.class);
return distributedLock;
}
/**
* 解锁
*
* @param lockObj
*/
private void unLock(ILock lockObj) {
if (Objects.isNull(lockObj)) {
return;
}
try {
this.distributedLock.unLock(lockObj);
} catch (Exception e) {
log.error("分布式锁解锁异常", e);
}
}
/**
* 获取 lockKey
*
* @param pjp
* @param distributedLock
* @return
*/
private String getLockKey(ProceedingJoinPoint pjp, DistributedLock distributedLock) {
String lockKey = distributedLock.key();
String keyPrefix = distributedLock.keyPrefix();
if (StringUtils.isBlank(lockKey)) {
throw new UtilException("Lok key cannot be empty");
}
if (lockKey.contains("#")) {
this.checkSpEL(lockKey);
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
// 获取方法参数值
Object[] args = pjp.getArgs();
lockKey = getValBySpEL(lockKey, methodSignature, args);
}
lockKey = StringUtils.isBlank(keyPrefix) ? lockKey : keyPrefix + lockKey;
return lockKey;
}
/**
* 解析spEL表达式
*
* @param spEL
* @param methodSignature
* @param args
* @return
*/
private String getValBySpEL(String spEL, MethodSignature methodSignature, Object[] args) {
// 获取方法形参名数组
String[] paramNames = nameDiscoverer.getParameterNames(methodSignature.getMethod());
if (paramNames == null || paramNames.length < 1) {
throw new UtilException("Lok key cannot be empty");
}
Expression expression = spelExpressionParser.parseExpression(spEL);
// spring的表达式上下文对象
EvaluationContext context = new StandardEvaluationContext();
// 给上下文赋值
for (int i = 0; i < args.length; i++) {
context.setVariable(paramNames[i], args[i]);
}
Object value = expression.getValue(context);
if (value == null) {
throw new UtilException("The parameter value cannot be null");
}
return value.toString();
}
/**
* SpEL 表达式校验
*
* @param spEL
* @return
*/
private void checkSpEL(String spEL) {
try {
ExpressionParser parser = new SpelExpressionParser();
parser.parseExpression(spEL, new TemplateParserContext());
} catch (Exception e) {
log.error("spEL表达式解析异常", e);
throw new UtilException("Invalid SpEL expression [" + spEL + "]");
}
}
}
5.2.8 定义分布式锁启动元注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({DistributedLockAspect.class})
public @interface EnableDistributedLock {
}
5.3 实战使用
5.3.1 启用分布式锁
com.laigeoffer.pmhub.workflow.PmHubWorkflowApplication
5.3.2 更新审批设置接口添加注解
在具体业务场景中,如更新项目和任务审批设置时,为保证同一时刻只有一个服务能拿到锁,在WfDeployController的updateApprovalSet方法中启用分布式锁并添加注解。
com.laigeoffer.pmhub.workflow.controller.WfDeployController#updateApprovalSet
/**
* 更新审批设置
* @param approvalSetDTO
* @return
*/
@InnerAuth
@PostMapping("/updateApprovalSet")
@DistributedLock(key = "#approvalSetDTO.approved", lockTime = 10L, keyPrefix = "workflow-approve-")
public R<?> updateApprovalSet(@RequestBody ApprovalSetDTO approvalSetDTO) {
return R.ok(deployService.updateApprovalSet(approvalSetDTO, ProjectStatusEnum.PROJECT.getStatusName()));
}
6 总结
本文围绕PmHub项目,先对比本地锁和分布式锁,阐述分布式锁特性。实现方式中Redis性能高,Redisson有四层保护。项目实战从添加依赖到切面控制,最后在具体接口添加注解,保证数据一致性和操作正确执行。
7 参考链接
- PmHub集成Redis分布式锁保障流程状态更新
- 项目仓库(GitHub)
- 项目仓库(码云): (国内访问速度更快)