前言
Sentinel有pull(拉)模式,和push(推)模式。本文是使用reids实现pull模式。
通过SPI机制引入自己的类
在项目的 resources > META-INF > services下创建新文件,文件名如下,内容是自己实现类的全限定名:com.xx.sentinel.RedisDataSourceInit
 
创建实现类
package com.xx.sentinel;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import com.alibaba.csp.sentinel.init.InitFunc;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.csp.sentinel.transport.util.WritableDataSourceRegistry;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.xx.schedule.Utils.SpringUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.List;
/**
 * @Author: wangyn85 <br>
 * @CreateDate: 2023/07/01 11:14 <br>
 * @Description: sentinel初始化方法,进行持久化设置
 */
public class RedisDataSourceInit implements InitFunc {
    /**
     * sentinel存放redis的key。例如:sentinel:common:flow
     */
    private static final String SENTINEL_REDIS_KEY = "sentinel:%s:%s";
    /**
     * sentinel数据更新发布订阅频道。例如:chanel_sentinel_common_flow
     */
    private static final String SENTINEL_REDIS_CHANEL = "chanel_sentinel_%s_%s";
    private RedisTemplate redisTemplate;
    private static final String RULE_FLOW = "flow";
    private static final String RULE_DEGRADE = "degrade";
    /**
     * 给模板对象RedisTemplate赋值,并传出去
     */
    private RedisTemplate<String, Object> getRedisTemplate() {
        if (redisTemplate == null) {
            synchronized (this) {
                if (redisTemplate == null) {
                    redisTemplate = SpringUtil.getBean("functionDomainRedisTemplate");
                }
            }
        }
        return redisTemplate;
    }
    /**
     * 获取sentinel存放redis的key
     *
     * @param ruleType
     * @return
     */
    private String getSentinelRedisKey(String ruleType) {
        String projectName = SentinelRedisHelper.getProjectName();
        return String.format(SENTINEL_REDIS_KEY, projectName, ruleType);
    }
    /**
     * 获取sentinel数据更新发布订阅频道
     *
     * @param ruleType
     * @return
     */
    private String getSentinelRedisChanel(String ruleType) {
        String projectName = SentinelRedisHelper.getProjectName();
        return String.format(SENTINEL_REDIS_CHANEL, projectName, ruleType);
    }
    @Override
    public void init() throws Exception {
        // 没有配置redis或没有配置projectName则不进行持久化配置
        if (getRedisTemplate() == null || StringUtils.isEmpty(SentinelRedisHelper.getProjectName())) {
            return;
        }
        // 1.处理流控规则
        this.dealFlowRules();
        // 2.处理熔断规则
        this.dealDegradeRules();
    }
    /**
     * 处理流控规则
     */
    private void dealFlowRules() {
        String redisFlowKey = getSentinelRedisKey(RULE_FLOW);
        String redisFlowChanel = getSentinelRedisChanel(RULE_FLOW);
        // 注册flow读取规则
        // 官方RedisDataSource是订阅获取,官方FileRefreshableDataSource是定时刷新获取。本方法是redis订阅+定时
        Converter<String, List<FlowRule>> parser = source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
        });
        ReadableDataSource<String, List<FlowRule>> redisFlowDataSource = new RedisDataSource2<>(parser, getRedisTemplate(), redisFlowKey, redisFlowChanel);
        FlowRuleManager.register2Property(redisFlowDataSource.getProperty());
        // 初始化加载一次所有flow规则
        String flowRulesStr = (String) getRedisTemplate().opsForValue().get(redisFlowKey);
        List<FlowRule> flowRuleList = parser.convert(flowRulesStr);
        redisFlowDataSource.getProperty().updateValue(flowRuleList);
        // 注册flow写入规则。这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
        WritableDataSource<List<FlowRule>> flowRuleWds = new RedisWritableDataSource<>(JSON::toJSONString, getRedisTemplate(), redisFlowKey, redisFlowChanel);
        WritableDataSourceRegistry.registerFlowDataSource(flowRuleWds);
    }
    /**
     * 处理熔断规则
     */
    public void dealDegradeRules() {
        String redisDegradeKey = getSentinelRedisKey(RULE_DEGRADE);
        String redisDegradeChanel = getSentinelRedisChanel(RULE_DEGRADE);
        Converter<String, List<DegradeRule>> parser = source -> JSON.parseObject(source, new TypeReference<List<DegradeRule>>() {
        });
        ReadableDataSource<String, List<DegradeRule>> redisDegradeDataSource = new RedisDataSource2<>(parser, getRedisTemplate(), redisDegradeKey, redisDegradeChanel);
        DegradeRuleManager.register2Property(redisDegradeDataSource.getProperty());
        // 初始化加载一次所有flow规则
        String degradeRulesStr = (String) getRedisTemplate().opsForValue().get(redisDegradeKey);
        List<DegradeRule> degradeRuleList = parser.convert(degradeRulesStr);
        redisDegradeDataSource.getProperty().updateValue(degradeRuleList);
        // 注册degrade写入规则。这样收到控制台推送的规则时,Sentinel 会先更新到内存,然后将规则写入到文件中.
        WritableDataSource<List<DegradeRule>> degradeRuleWds = new RedisWritableDataSource<>(JSON::toJSONString, getRedisTemplate(), redisDegradeKey, redisDegradeChanel);
        WritableDataSourceRegistry.registerDegradeDataSource(degradeRuleWds);
    }
}
 
创建读取
package com.midea.sentinel;
import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.util.AssertUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import java.nio.charset.StandardCharsets;
/**
 * @Author: wangyn85 <br>
 * @CreateDate: 2023/07/04 11:09 <br>
 * @Description: 订阅redis通知,当sentinel规则发生变化时,拉取redis配置保存到内存。定时获取redis信息
 */
public class RedisDataSource2<T> extends AutoRefreshDataSource<String, T> {
    private static Logger logger = LoggerFactory.getLogger(RedisDataSource2.class);
    private static final String REDIS_SUCCESS_MSG = "OK";
    private String lastModified = "-1";
    private RedisTemplate redisTemplate;
    /**
     * 存入redis的对应规则的key
     */
    private String ruleKey;
    /**
     * redis订阅频道
     */
    private String channel;
    /**
     * 存入redis更新时间的key
     */
    private String ruleUpdateKey;
    /**
     * 定时获取redis信息
     */
    private static final long DEFAULT_REFRESH_MS = 300000L;
    public RedisDataSource2(Converter<String, T> parser, RedisTemplate redisTemplate, String ruleKey, String channel) {
        // 父级构造器,传入定时执行的时间
        super(parser, DEFAULT_REFRESH_MS);
        AssertUtil.notNull(redisTemplate, "redisTemplate can not be null");
        AssertUtil.notEmpty(ruleKey, "redis ruleKey can not be empty");
        AssertUtil.notEmpty(channel, "redis subscribe channel can not be empty");
        this.redisTemplate = redisTemplate;
        this.ruleKey = ruleKey;
        this.channel = channel;
        this.ruleUpdateKey = SentinelRedisHelper.getRedisUpdateTimeKey(ruleKey);
        subscribeFromChannel();
    }
    @Override
    public String readSource() throws Exception {
        return (String) redisTemplate.opsForValue().get(ruleKey);
    }
    @Override
    public void close() throws Exception {
        super.close();
        redisTemplate.execute((RedisCallback<String>) connection -> {
            connection.getSubscription().unsubscribe(channel.getBytes(StandardCharsets.UTF_8));
            return REDIS_SUCCESS_MSG;
        });
    }
    /**
     * 订阅消息队列
     */
    private void subscribeFromChannel() {
        redisTemplate.execute((RedisCallback<String>) connection -> {
            connection.subscribe((message, pattern) -> {
                byte[] bytes = message.getBody();
                String msg = new String(bytes, StandardCharsets.UTF_8);
                logger.info("{},接收到sentinel规则更新消息: {} ", channel, msg);
                try {
                    // 收到更新通知后,从redis获取全量数据更新到内存中
                    getProperty().updateValue(parser.convert(readSource()));
                } catch (Exception e) {
                    logger.error(channel + ",接收到sentinel规则更新消息:{},更新出错:{}", msg, e.getMessage());
                }
            }, channel.getBytes(StandardCharsets.UTF_8));
            return REDIS_SUCCESS_MSG;
        });
    }
    @Override
    protected boolean isModified() {
        // 根据redis的key查询是否有更新,没有更新返回false,就不用执行后面的拉取数据,提高性能
        String updateTimeStr = (String) redisTemplate.opsForValue().get(ruleUpdateKey);
        if (StringUtils.isEmpty(updateTimeStr) || updateTimeStr.equals(lastModified)) {
            return false;
        }
        this.lastModified = updateTimeStr;
        return true;
    }
}
 
创建写入
package com.midea.sentinel;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.WritableDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @Author: wangyn85 <br>
 * @CreateDate: 2023/07/04 09:47 <br>
 * @Description: 收到sentinel控制的规则更新后,讲规则持久化,并发布redis订阅通知
 */
public class RedisWritableDataSource<T> implements WritableDataSource<T> {
    private static Logger logger = LoggerFactory.getLogger(RedisWritableDataSource.class);
    private final String redisRuleKey;
    private final Converter<T, String> configEncoder;
    private final RedisTemplate redisTemplate;
    private final String redisFlowChanel;
    private final Lock lock;
    /**
     * 存入redis更新时间的key
     */
    private String ruleUpdateKey;
    private static final String SENTINEL_RULE_CHANGE = "CHANGE";
    public RedisWritableDataSource(Converter<T, String> configEncoder, RedisTemplate redisTemplate, String redisRuleKey, String redisFlowChanel) {
        this.redisRuleKey = redisRuleKey;
        this.configEncoder = configEncoder;
        this.redisTemplate = redisTemplate;
        this.redisFlowChanel = redisFlowChanel;
        this.lock = new ReentrantLock(true);
        this.ruleUpdateKey = SentinelRedisHelper.getRedisUpdateTimeKey(redisRuleKey);
    }
    @Override
    public void write(T value) throws Exception {
        this.lock.lock();
        try {
            logger.info("收到sentinel控制台规则写入信息,并准备持久化:{}", value);
            String convertResult = this.configEncoder.convert(value);
            redisTemplate.opsForValue().set(ruleUpdateKey, String.valueOf(System.currentTimeMillis()));
            redisTemplate.opsForValue().set(redisRuleKey, convertResult);
            logger.info("收到sentinel控制台规则写入信息,持久化后发布redis通知:{},信息:{}", this.redisFlowChanel, SENTINEL_RULE_CHANGE);
            redisTemplate.convertAndSend(this.redisFlowChanel, SENTINEL_RULE_CHANGE);
        } catch (Exception e) {
            logger.info("收到sentinel控制台规则写入信息,持久化出错:{}", e);
            throw e;
        } finally {
            this.lock.unlock();
        }
    }
    @Override
    public void close() throws Exception {
    }
}
 
创建需要的配置类
package com.midea.sentinel;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * @Author: wangyn85 <br>
 * @CreateDate: 2023/07/04 16:12 <br>
 * @Description: sentinel在redis持久化相关配置
 */
@Component
public class SentinelRedisHelper {
    @Value("${project.name}")
    private String projectName;
    private static String SENTINEL_REDIS_UPDATE_TIME = "%s:updateTime";
    private static SentinelRedisHelper self;
    @PostConstruct
    public void init() {
        self = this;
    }
    /**
     * 获取sentinel中配置的项目名
     *
     * @return
     */
    public static String getProjectName() {
        return self.projectName;
    }
    /**
     * 获取redis对应规则更新时间的key
     *
     * @param redisKey redis对应规则的key
     * @return
     */
    public static String getRedisUpdateTimeKey(String redisKey) {
        return String.format(SENTINEL_REDIS_UPDATE_TIME, redisKey);
    }
}
                


















