Sentinel源码—8.限流算法和设计模式总结二

news2025/7/9 13:30:24

大纲

1.关于限流的概述

2.高并发下的四大限流算法原理及实现

3.Sentinel使用的设计模式总结

3.Sentinel使用的设计模式总结

(1)责任链模式

(2)监听器模式

(3)适配器模式

(4)模版方法模式

(5)策略模式

(6)观察者模式

(1)责任链模式

一.责任链接口ProcessorSlot

二.责任链接口的抽象实现类

三.责任链的构建

Sentinel的功能都是靠一条链式的ProcessorSlot来完成的,这些ProcessorSlot的初始化以及调用便使用了责任链模式。

一.责任链接口ProcessorSlot

entry()方法相当于AOP的before()方法,也就是入口方法,因此责任链执行时会调用entry()方法。

exit()方法相当于AOP的after()方法,也就是出口方法,因此责任链执行结束时会调用exit()方法。

fireEntry()方法相当于AOP在执行完before()方法后调用pjp.proceed()方法,也就是调用责任链上的下一个节点的entry()方法。

fireExit()方法相当于AOP在执行完exit()方法后调用pjp.proceed()方法,也就是调用责任链上的下一个节点的exit()方法。

//A container of some process and ways of notification when the process is finished.
public interface ProcessorSlot<T> {
    //Entrance of this slot.
    //@param context         current Context
    //@param resourceWrapper current resource
    //@param param           generics parameter, usually is a com.alibaba.csp.sentinel.node.Node
    //@param count           tokens needed
    //@param prioritized     whether the entry is prioritized
    //@param args            parameters of the original call
    //@throws Throwable blocked exception or unexpected error
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable;

    //Means finish of #entry(Context, ResourceWrapper, Object, int, boolean, Object...).
    //@param context         current Context
    //@param resourceWrapper current resource
    //@param obj             relevant object (e.g. Node)
    //@param count           tokens needed
    //@param prioritized     whether the entry is prioritized
    //@param args            parameters of the original call
    //@throws Throwable blocked exception or unexpected error
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable;

    //Exit of this slot.
    //@param context         current Context
    //@param resourceWrapper current resource
    //@param count           tokens needed
    //@param args            parameters of the original call
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    //Means finish of #exit(Context, ResourceWrapper, int, Object...).
    //@param context         current Context
    //@param resourceWrapper current resource
    //@param count           tokens needed
    //@param args            parameters of the original call
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

二.责任链接口的抽象实现类

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    //下一个节点,这里的责任链是一个单向链表,因此next就是当前节点所指向的下一个节点
    private AbstractLinkedProcessorSlot<?> next = null;

    //触发执行责任链下一个节点的entry()方法
    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

    //触发执行责任链下一个节点的exit()方法
    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
    }
}

三.责任链的构建

Sentinel在默认情况下会通过DefaultProcessorSlotChain类来实现责任链的构建,当然我们也可以通过SPI机制指定一个自定义的责任链构建类。

//Builder for a default {@link ProcessorSlotChain}.
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        //创建一个DefaultProcessorSlotChain对象实例
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //通过SPI机制加载责任链的节点ProcessorSlot实现类
        //然后按照@Spi注解的order属性进行排序并进行实例化
        //最后将ProcessorSlot实例放到sortedSlotList中
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        //遍历已排好序的ProcessorSlot集合
        for (ProcessorSlot slot : sortedSlotList) {
            //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            //调用DefaultProcessorSlotChain.addLast()方法构建单向链表
            //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
        //返回单向链表
        return chain;
    }
}

DefaultProcessorSlotChain构建的责任链如下:

(2)监听器模式

一.监听器接口和具体实现

二.监听器管理器接口和具体实现

三.使用方如何基于这套监听器机制管理规则

Sentinel在加载和配置规则的时候就使用了监听器模式。监听器模式的实现分为三大部分:监听器、监听器管理器、使用方(比如规则管理器)。

一.监听器接口和具体实现

//This class holds callback method when SentinelProperty#updateValue(Object) need inform the listener
//监听器接口,负责监听各个配置,包含两个方法:初始化方法以及更新方法
public interface PropertyListener<T> {
    //Callback method when SentinelProperty#updateValue(Object) need inform the listener.
    //规则变更时触发的回调方法
    void configUpdate(T value);
    //The first time of the value's load.
    //首次加载规则时触发的回调方法
    void configLoad(T value);
}

//流控规则管理器
public class FlowRuleManager {
    ...
    //监听器接口的具体实现:流控规则监听器
    private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
        //初始化规则
        @Override
        public synchronized void configUpdate(List<FlowRule> value) {
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
            if (rules != null) {
                flowRules = rules;
            }
            RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
        }

        //规则变更
        @Override
        public synchronized void configLoad(List<FlowRule> conf) {
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
            if (rules != null) {
                flowRules = rules;
            }
            RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);
        }
    }
}

二.监听器管理器接口和具体实现

//监听器管理器接口
public interface SentinelProperty<T> {
    //添加监听者
    void addListener(PropertyListener<T> listener);

    //移除监听者
    void removeListener(PropertyListener<T> listener);

    //当监听值有变化时,调用此方法进行通知
    boolean updateValue(T newValue);
}

//监听器管理器具体实现
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
    //存放每个监听器
    protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
    //要监听的值
    private T value = null;

    public DynamicSentinelProperty() {

    }

    //添加监听器到集合
    @Override
    public void addListener(PropertyListener<T> listener) {
        listeners.add(listener);
        //回调监听器的configLoad()方法初始化规则配置
        listener.configLoad(value);
    }

    //移除监听器
    @Override
    public void removeListener(PropertyListener<T> listener) {
        listeners.remove(listener);
    }

    //更新值
    @Override
    public boolean updateValue(T newValue) {
        //如果值没变化,直接返回
        if (isEqual(value, newValue)) {
            return false;
        }
        RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);

        //如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值
        value = newValue;
        for (PropertyListener<T> listener : listeners) {
            listener.configUpdate(newValue);
        }
        return true;
    }

    //对比值是否发生了变化
    private boolean isEqual(T oldValue, T newValue) {
        if (oldValue == null && newValue == null) {
            return true;
        }
        if (oldValue == null) {
            return false;
        }
        return oldValue.equals(newValue);
    }

    //清空监听器集合
    public void close() {
        listeners.clear();
    }
}

三.使用方如何基于这套监听器机制管理规则

//流控规则管理器
public class FlowRuleManager {
    //维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则
    private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();

    //饿汉式单例模式实例化流控规则的监听器对象
    private static final FlowPropertyListener LISTENER = new FlowPropertyListener();

    //监听器对象的管理器
    private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

    //当FlowRuleManager类的静态方法首次被调用时,会执行这里的静态代码块(对应类加载的过程)
    static {
        //将流控规则监听器注册到监听器管理器中
        currentProperty.addListener(LISTENER);
        startMetricTimerListener();
    }

    //Load FlowRules, former rules will be replaced.
    //加载流控规则
    public static void loadRules(List<FlowRule> rules) {
        //通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置
        //其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules
        currentProperty.updateValue(rules);
    }
    ...
}

//使用方:通过流控规则管理器FlowRuleManager加载和监听流控规则
public class FlowQpsDemo {
    private static final String KEY = "abc";
    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();
    private static volatile boolean stop = false;
    private static final int threadCount = 32;
    private static int seconds = 60 + 40;

    public static void main(String[] args) throws Exception {
        //初始化QPS的流控规则
        initFlowQpsRule();

        //启动线程定时输出信息
        tick();

        //first make the system run on a very low condition
        //模拟QPS为32时的访问场景
        simulateTraffic();

        System.out.println("===== begin to do flow control");
        System.out.println("only 20 requests per second can pass");
    }

    private static void initFlowQpsRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(KEY);
        //设置QPS的限制为20
        rule1.setCount(20);
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);

        //首次调用FlowRuleManager的静态方法会加载FlowRuleManager类执行其静态代码块
        //加载流控规则
        FlowRuleManager.loadRules(rules);
    }

    private static void simulateTraffic() {
        for (int i = 0; i < threadCount; i++) {
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");
            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;
                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;
                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;
                System.out.println(seconds + " send qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock);
                if (seconds-- <= 0) {
                    stop = true;
                }
            }
            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get() + ", block:" + block.get());
            System.exit(0);
        }
    }

    static class RunTask implements Runnable {
        @Override
        public void run() {
            while (!stop) {
                Entry entry = null;
                try {
                    //调用entry()方法开始规则验证
                    entry = SphU.entry(KEY);
                    //token acquired, means pass
                    pass.addAndGet(1);
                } catch (BlockException e1) {
                    block.incrementAndGet();
                } catch (Exception e2) {
                    //biz exception
                } finally {
                    total.incrementAndGet();
                    if (entry != null) {
                        //完成规则验证调用exit()方法
                        entry.exit();
                    }
                }
                Random random2 = new Random();
                try {
                    TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    }
}

(3)适配器模式

适配器模式是一种结构型设计模式,它允许将一个类的接口转换为客户端期望的另一个接口。在Sentinel中,使用适配器模式将不同框架和库的接口适配为统一的接口,如SphU类。SphU类提供了统一的入口,用于执行不同的资源保护逻辑。

public class SphU {
    private static final Object[] OBJECTS0 = new Object[0];

    private SphU() {            
    }

    //Record statistics and perform rule checking for the given resource.
    //@param name the unique name of the protected resource
    //@return the Entry of this invocation (used for mark the invocation complete and get context data)
    public static Entry entry(String name) throws BlockException {
        //调用CtSph.entry()方法创建一个Entry资源访问对象
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

    //Checking all Rules about the protected method.
    //@param method the protected method
    public static Entry entry(Method method) throws BlockException {
        return Env.sph.entry(method, EntryType.OUT, 1, OBJECTS0);
    }

    //Checking all Rules about the protected method.
    //@param method     the protected method
    //@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)
    public static Entry entry(Method method, int batchCount) throws BlockException {
        return Env.sph.entry(method, EntryType.OUT, batchCount, OBJECTS0);
    }

    //Record statistics and perform rule checking for the given resource.
    //@param name       the unique string for the resource
    //@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)
    public static Entry entry(String name, int batchCount) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, batchCount, OBJECTS0);
    }

    //Checking all Rules about the protected method.
    //@param method      the protected method
    //@param trafficType the traffic type (inbound, outbound or internal). 
    //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.
    public static Entry entry(Method method, EntryType trafficType) throws BlockException {
        return Env.sph.entry(method, trafficType, 1, OBJECTS0);
    }

    //Record statistics and perform rule checking for the given resource.
    public static Entry entry(String name, EntryType trafficType) throws BlockException {
        //调用CtSph.entry()方法创建一个Entry资源访问对象
        return Env.sph.entry(name, trafficType, 1, OBJECTS0);
    }

    //Checking all Rules about the protected method.
    //@param method      the protected method
    //@param trafficType the traffic type (inbound, outbound or internal). 
    //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.
    //@param batchCount  the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)
    public static Entry entry(Method method, EntryType trafficType, int batchCount) throws BlockException {
        return Env.sph.entry(method, trafficType, batchCount, OBJECTS0);
    }

    //Record statistics and perform rule checking for the given resource.
    public static Entry entry(String name, EntryType trafficType, int batchCount) throws BlockException {
        return Env.sph.entry(name, trafficType, batchCount, OBJECTS0);
    }

    //Checking all Rules about the protected method.
    //@param method      the protected method
    //@param trafficType the traffic type (inbound, outbound or internal). 
    //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.
    //@param batchCount  the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens)
    //@param args        args for parameter flow control or customized slots
    //@return the Entry of this invocation (used for mark the invocation complete and get context data)
    public static Entry entry(Method method, EntryType trafficType, int batchCount, Object... args) throws BlockException {
        return Env.sph.entry(method, trafficType, batchCount, args);
    }

    //Record statistics and perform rule checking for the given resource.
    public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException {
        return Env.sph.entry(name, trafficType, batchCount, args);
    }

    //Record statistics and check all rules of the resource that indicates an async invocation.
    //@param name the unique name of the protected resource
    public static AsyncEntry asyncEntry(String name) throws BlockException {
        return Env.sph.asyncEntry(name, EntryType.OUT, 1, OBJECTS0);
    }

    //Record statistics and check all rules of the resource that indicates an async invocation.
    //@param name        the unique name for the protected resource
    //@param trafficType the traffic type (inbound, outbound or internal). 
    //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule.
    //@return the Entry of this invocation (used for mark the invocation complete and get context data)
    public static AsyncEntry asyncEntry(String name, EntryType trafficType) throws BlockException {
        return Env.sph.asyncEntry(name, trafficType, 1, OBJECTS0);
    }

    public static AsyncEntry asyncEntry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException {
        return Env.sph.asyncEntry(name, trafficType, batchCount, args);
    }

    //Record statistics and perform rule checking for the given resource. The entry is prioritized.
    public static Entry entryWithPriority(String name) throws BlockException {
        return Env.sph.entryWithPriority(name, EntryType.OUT, 1, true);
    }

    //Record statistics and perform rule checking for the given resource. The entry is prioritized.
    public static Entry entryWithPriority(String name, EntryType trafficType) throws BlockException {
        return Env.sph.entryWithPriority(name, trafficType, 1, true);
    }

    //Record statistics and perform rule checking for the given resource.
    public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
    }

    //Record statistics and perform rule checking for the given resource.
    public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
    }

    //Record statistics and perform rule checking for the given resource that indicates an async invocation.
    public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType) throws BlockException {
        return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, OBJECTS0);
    }

    //Record statistics and perform rule checking for the given resource that indicates an async invocation.
    public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException {
        return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, args);
    }

    //Record statistics and perform rule checking for the given resource that indicates an async invocation.
    public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, int batchCount, Object[] args) throws BlockException {
        return Env.sph.asyncEntryWithType(name, resourceType, trafficType, batchCount, false, args);
    }
}

(4)模版方法模式

模板方法模式是一种行为型设计模式,它在一个方法中定义一个算法的骨架,将一些步骤延迟到子类中实现。Sentinel便使用了类似模板方法模式来处理熔断策略,但不是严格意义上的模板模式,因为模板方法模式一般会有一个final修饰的模板方法来定义整个流程。例如AbstractCircuitBreaker类定义了熔断策略的基本结构,具体的细节需要继承它并实现对应的方法。

public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    @Override
    public boolean tryPass(Context context) {
        ...
    }

    //提供抽象方法供子类实现
    abstract void resetStat();
}

//子类
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    @Override
    protected void resetStat() {
        stat.currentWindow().value().reset();
    }
}

(5)策略模式

策略模式是一种行为型设计模式,定义了一系列的算法,并将每个算法封装起来,使它们可以互相替换。Sentinel便在构建流控规则对象时使用了策略模式来设置不同的流控策略。例如TrafficShapingController接口定义了流控策略的方法,具体的实现类负责实现不同的流控策略。

//流控效果接口
public interface TrafficShapingController {
    //Check whether given resource entry can pass with provided count.
    //@param node resource node
    //@param acquireCount count to acquire
    //@param prioritized whether the request is prioritized
    //@return true if the resource entry can pass; false if it should be blocked
    boolean canPass(Node node, int acquireCount, boolean prioritized);
    
    //Check whether given resource entry can pass with provided count.
    //@param node resource node
    //@param acquireCount count to acquire
    //@return true if the resource entry can pass; false if it should be blocked
    boolean canPass(Node node, int acquireCount);
}

//流控规则管理器
public class FlowRuleManager {
    ...
    private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
        //初始化规则
        @Override
        public synchronized void configUpdate(List<FlowRule> value) {
            //构建流控规则对象
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
            if (rules != null) {
                flowRules = rules;
            }
            RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
        }

        //规则变更
        @Override
        public synchronized void configLoad(List<FlowRule> conf) {
            //构建流控规则对象
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
            if (rules != null) {
                flowRules = rules;
            }
            RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);
        }
    }
}

public final class FlowRuleUtil {
    ...
    public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction, Predicate<FlowRule> filter, boolean shouldSort) {
        Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
        if (list == null || list.isEmpty()) {
            return newRuleMap;
        }

        Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
        for (FlowRule rule : list) {
            if (!isValidRule(rule)) {
                RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
                continue;
            }
            if (filter != null && !filter.test(rule)) {
                continue;
            }
            if (StringUtil.isBlank(rule.getLimitApp())) {
                rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
            }

            //获取[流控效果]处理器
            TrafficShapingController rater = generateRater(rule);
            rule.setRater(rater);
            //获取资源名
            K key = groupFunction.apply(rule);
            if (key == null) {
                continue;
            }

            //获取资源名对应的流控规则列表
            Set<FlowRule> flowRules = tmpMap.get(key);
            //将规则放到Map里,和当前资源绑定
            if (flowRules == null) {
                //Use hash set here to remove duplicate rules.
                flowRules = new HashSet<>();
                tmpMap.put(key, flowRules);
            }
            flowRules.add(rule);
        }

        Comparator<FlowRule> comparator = new FlowRuleComparator();
        for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
            List<FlowRule> rules = new ArrayList<>(entries.getValue());
            if (shouldSort) {
                //Sort the rules.
                Collections.sort(rules, comparator);
            }
            newRuleMap.put(entries.getKey(), rules);
        }
        return newRuleMap;
    }

    private static TrafficShapingController generateRater(FlowRule rule) {
        //判断只有当阈值类型为QPS时才生效
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            //根据流控效果选择不同的流量整形控制器TrafficShapingController
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP://Warm Up预热模式——冷启动模式
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER://排队等待模式
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER://Warm Up + 排队等待模式
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT://快速失败模式——Default默认模式
                default:
                    //Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        //默认模式:快速失败用的是DefaultController
        return new DefaultController(rule.getCount(), rule.getGrade());
    }
    ...
}

public class FlowRuleChecker {
    ...
    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        //选择Node作为限流计算的依据
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        //先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器
        //然后调用TrafficShapingController.canPass()方法对请求进行检查
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
    ...
}

(6)观察者模式

Sentinel实现熔断功能使用了观察者模式。具体接口是CircuitBreakerStateChangeObserver,它负责感知熔断器状态发生变化后通知到各个观察者。

//1.首先定义观察者接口CircuitBreakerStateChangeObserver
public interface CircuitBreakerStateChangeObserver {
    void onStateChange(CircuitBreaker oldCircuitBreaker, CircuitBreaker newCircuitBreaker);
}

//2.在熔断器事件注册类EventObserverRegistry中:
//定义一个观察者Map(stateChangeObserverMap)用于存放观察者实例,并提供注册、移除和获取全部观察者的方法
public class EventObserverRegistry {
    private final Map<String, CircuitBreakerStateChangeObserver> stateChangeObserverMap = new HashMap<>();

    //注册观察者
    public void addStateChangeObserver(String name, CircuitBreakerStateChangeObserver observer) {
        stateChangeObserverMap.put(name, observer);
    }

    //移除观察者
    public boolean removeStateChangeObserver(String name) {
        return stateChangeObserverMap.remove(name) != null;
    }

    //获取全部观察者
    public List<CircuitBreakerStateChangeObserver> getStateChangeObservers() {
        return new ArrayList<>(stateChangeObserverMap.values());
    }
    ...
}

//3.当熔断器状态发生变化时,通知所有已注册的观察者。
//比如在AbstractCircuitBreaker类中的notifyObservers方法中实现:
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) {
        for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers()) {
            observer.onStateChange(prevState, newState, rule, snapshotValue);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2340842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VulnHub-DarkHole_1靶机渗透教程

VulnHub-DarkHole_1靶机渗透教程 1.靶机部署 [Onepanda] Mik1ysomething 靶机下载&#xff1a;https://download.vulnhub.com/darkhole/DarkHole.zip 直接使用VMware打开就行 导入成功&#xff0c;打开虚拟机&#xff0c;到此虚拟机部署完成&#xff01; 注意&#xff1a…

边缘计算全透视:架构、应用与未来图景

边缘计算全透视&#xff1a;架构、应用与未来图景 一、产生背景二、本质三、特点&#xff08;一&#xff09;位置靠近数据源&#xff08;二&#xff09;分布式架构&#xff08;三&#xff09;实时性要求高 四、关键技术&#xff08;一&#xff09;硬件技术&#xff08;二&#…

MQ底层原理

RabbitMQ 概述 RabbitMQ 是⼀个开源的⾼性能、可扩展、消息中间件&#xff08;Message Broker&#xff09;&#xff0c;实现了 Advanced Message Queuing Protocol&#xff08;AMQP&#xff09;协议&#xff0c;可以帮助不同应⽤程序之间进⾏通信和数据交换。RabbitMQ 是由 E…

本地部署DeepSeek-R1模型接入PyCharm

以下是DeepSeek-R1本地部署及接入PyCharm的详细步骤指南,整合了视频内容及官方文档核心要点: 一、本地部署DeepSeek-R1模型 1. 安装Ollama框架 ​下载安装包 访问Ollama官网(https://ollama.com/download)Windows用户选择.exe文件,macOS用户选择.dmg包。 ​安装验证 双击…

Java基于SpringBoot的企业车辆管理系统,附源码+文档说明

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…

进阶篇 第 2 篇:自相关性深度解析 - ACF 与 PACF 图完全指南

进阶篇 第 2 篇&#xff1a;自相关性深度解析 - ACF 与 PACF 图完全指南 (图片来源: Negative Space on Pexels) 欢迎来到进阶系列的第二篇&#xff01;在上一篇&#xff0c;我们探讨了更高级的时间序列分解技术和强大的指数平滑 (ETS) 预测模型。ETS 模型通过巧妙的加权平均捕…

鸿蒙移动应用开发--渲染控制实验

任务&#xff1a;使用“对象数组”、“ForEach渲染”、“Badge角标组件”、“Grid布局”等相关知识&#xff0c;实现生效抽奖卡案例。如图1所示&#xff1a; 图1 生肖抽奖卡实例图 图1(a)中有6张生肖卡可以抽奖&#xff0c;每抽中一张&#xff0c;会通过弹层显示出来&#xf…

安宝特分享|AR智能装备赋能企业效率跃升

AR装备开启智能培训新时代 在智能制造与数字化转型浪潮下&#xff0c;传统培训体系正面临深度重构。安宝特基于工业级AR智能终端打造的培训系统&#xff0c;可助力企业构建智慧培训新生态。 AR技术在不同领域的助力 01远程指导方面 相较于传统视频教学的单向输出模式&#x…

SpringCloud组件—Eureka

一.背景 1.问题提出 我们在一个父项目下写了两个子项目&#xff0c;需要两个子项目之间相互调用。我们可以发送HTTP请求来获取我们想要的资源&#xff0c;具体实现的方法有很多&#xff0c;可以用HttpURLConnection、HttpClient、Okhttp、 RestTemplate等。 举个例子&#x…

模型 螃蟹效应

系列文章分享模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。个体互钳&#xff0c;团队难行。 1 螃蟹效应的应用 1.1 教育行业—优秀教师遭集体举报 行业背景&#xff1a;某市重点中学推行绩效改革&#xff0c;将班级升学率与教师奖金直接挂钩&#xff0c;打破原…

符号速率估计——小波变换法

[TOC]符号速率估计——小波变换法 一、原理 1.Haar小波变换 小波变换在信号处理领域被成为数学显微镜&#xff0c;不同于傅里叶变换&#xff0c;小波变换可以观测信号随时间变换的频谱特征&#xff0c;因此&#xff0c;常用于时频分析。   当小波变换前后位置处于同一个码元…

每日算法-250422

每日算法 - 250422 1561. 你可以获得的最大硬币数目 题目 思路 贪心 解题过程 根据题意&#xff0c;我们想要获得最大的硬币数目。每次选择时&#xff0c;有三堆硬币&#xff1a;最大的一堆会被 Alice 拿走&#xff0c;最小的一堆会被 Bob 拿走&#xff0c;剩下的一堆&#xf…

【MATLAB第116期】基于MATLAB的NBRO-XGBoost的SHAP可解释回归模型(敏感性分析方法)

【MATLAB第116期】基于MATLAB的NBRO-XGBoost的SHAP可解释回归模型&#xff08;敏感性分析方法&#xff09; 引言 该文章实现了一个可解释的回归模型&#xff0c;使用NBRO-XGBoost&#xff08;方法可以替换&#xff0c;但是需要有一定的编程基础&#xff09;来预测特征输出。该…

微信公众号消息模板推送没有“详情“按钮?无法点击跳转

踩坑&#xff01;&#xff01;&#xff01;&#xff01;踩坑&#xff01;&#xff01;&#xff01;&#xff01;踩坑&#xff01;&#xff01;&#xff01;&#xff01; 如下 简单说下我的情况&#xff0c;按官方文档传参url了 、但就是看不到查看详情按钮 。如下 真凶&#x…

电动单座V型调节阀的“隐形守护者”——阀杆节流套如何解决高流速冲刷难题

电动单座V型调节阀的“隐形守护者”——阀杆节流套如何解决高流速冲刷难题&#xff1f; 在工业自动化控制中&#xff0c;电动单座V型调节阀因其精准的流量调节能力&#xff0c;成为石油、化工等领域的核心设备。然而&#xff0c;长期高流速工况下&#xff0c;阀芯与阀座的冲刷腐…

自动驾驶与机器人算法学习

自动驾驶与机器人算法学习 直播与学习途径 欢迎你的点赞关注~

【网络编程】TCP数据流套接字编程

目录 一. TCP API 二. TCP回显服务器-客户端 1. 服务器 2. 客户端 3. 服务端-客户端工作流程 4. 服务器优化 TCP数据流套接字编程是一种基于有连接协议的网络通信方式 一. TCP API 在TCP编程中&#xff0c;主要使用两个核心类ServerSocket 和 Socket ServerSocket Ser…

从零开始配置 Zabbix 数据库监控:MySQL 实战指南

Zabbix作为一款开源的分布式监控工具&#xff0c;在监控MySQL数据库方面具有显著优势&#xff0c;能够为数据库的稳定运行、性能优化和故障排查提供全面支持。以下是使用Zabbix监控MySQL数据库的配置。 一、安装 Zabbix Agent 和 MySQL 1. 安装 Zabbix Agent services:zabbix…

Java学习手册:RESTful API 设计原则

一、RESTful API 概述 REST&#xff08;Representational State Transfer&#xff09;即表述性状态转移&#xff0c;是一种软件架构风格&#xff0c;用于设计网络应用程序。RESTful API 是符合 REST 原则的 Web API&#xff0c;通过使用 HTTP 协议和标准方法&#xff08;GET、…

读一篇AI论文并理解——通过幻觉诱导优化缓解大型视觉语言模型中的幻觉

目录 论文介绍 标题 作者 Publish Date Time PDF文章下载地址 文章理解分析 &#x1f4c4; 中文摘要&#xff1a;《通过幻觉诱导优化缓解大型视觉语言模型中的幻觉》 &#x1f9e0; 论文核心动机 &#x1f680; 创新方法&#xff1a;HIO&#xff08;Hallucination-In…