Redis 实现限流策略

news2025/7/19 21:02:55

除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求。

比如在 UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定 时间内允许的次数,超过了次数那就是非法行为。对非法行为,业务必须规定适当的惩处策略。

如何使用 Redis 来实现简单限流策略?

接口的定义

# 指定用户 user_id 的某个行为 action_key 在特定的时间内 period 只允许发生一定的次数
max_count
def is_action_allowed(user_id, action_key, period, max_count):
 return True
# 调用这个接口 , 一分钟内只允许最多回复 5 个帖子
can_reply = is_action_allowed("laoqian", "reply", 60, 5)
if can_reply:
 do_reply()
else:
 raise ActionThresholdOverflow()

解决方案

这个限流需求中存在一个滑动时间窗口,想想 zset 数据结构的 score 值,是不是可以通过 score 来圈出这个时间窗口来。

Redis sorted sets | Redis

而且我们只需要保留这个时间窗口,窗口之外的数据都可以砍掉。那这个 zset 的 value 填什么比较合适呢?它只需要保证唯一性即可,用 uuid 会比较浪费空间,那就改用毫秒时间戳吧。

 如图所示,用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个 key 保存下来。同一个用户同一种行为用一个 zset 记录。

为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。

通过统计滑动窗口内的行为数量与阈值 max_count 进行比较就可以得出当前的行为是否 允许。用代码表示如下:

import org.apache.shiro.util.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.List;

@Component
public class SimpleRateLimiter {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public boolean isActionAllowed(String userId, String actionKey, int period, long maxCount) {
        String key = String.format("hist:%s:%s", userId, actionKey);
        long nowMills = System.currentTimeMillis();

        List<Object> objects = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
//            打开管道
            connection.openPipeline();
            byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
            // 添加命令
            connection.zAdd(keyBytes, nowMills
                    , String.valueOf(nowMills).getBytes(StandardCharsets.UTF_8));
            // 清楚无用数据
            connection.zRemRangeByScore(keyBytes, 0, nowMills - period * 1000L);
            // 判断规定时间内请求数量
            connection.zCard(keyBytes);
            // 重新设置过期时间
            connection.expire(keyBytes, period + 1);
            // 关闭管道 不需要close 否则拿不到返回值
            // connection.closePipeline();
            // 这里一定要返回null,最终pipeline的执行结果,才会返回给最外层
            return null;
        });
        if (!CollectionUtils.isEmpty(objects)) {
            return (Long) objects.get(2) <= maxCount;
        }
        return true;
    }
}

测试:

import com.hcx.common.redisdemo.SimpleRateLimiter;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class MallCommonApplicationTests {

    @Autowired
    private SimpleRateLimiter limiter;

    @Test
    void contextLoads() {
        for (int i = 0; i < 20; i++) {
            System.out.println("结果:" + limiter.isActionAllowed("laoqian", "reply", 60, 5));
        }
    }
}
/*
结果:true
结果:true
结果:true
结果:true
结果:true
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
结果:false
*/

整体思路就是:每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。

 zset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了。 因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升 Redis 存取效率。

但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。

高级限流算法——漏斗限流

漏斗限流是最常用的限流方法之一,顾名思义,这个算法的灵感源于漏斗(funnel)的结构。 

漏洞的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,它就会变满,直至再也 装不进去。如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。 如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。

如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着 系统允许该行为的最大频率。

单机版漏斗算法实现_JAVA

import java.util.HashMap;
import java.util.Map;


public class FunnelRateLimiter {
    private final Map<String, Funnel> funnels = new HashMap<>();

    public static void main(String[] args) {
        FunnelRateLimiter limiter = new FunnelRateLimiter();
        for (int i = 0; i < 20; i++) {
            System.out.println("次数" + (i + 1) + ": " + limiter.isActionAllowed("user", "get", 10, 1));
        }
    }

    /**
     * @param userId      用户ID
     * @param actionKey   行为key
     * @param capacity    漏斗容量
     * @param leakingRate 漏嘴流水速率 单位mill
     * @return 行为能否执行
     */
    public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return funnel.watering(1); // 需要 1 个 quota
    }

    static class Funnel {
        /**
         * 漏斗容量
         */
        int capacity;
        /**
         * 漏嘴流水速率
         */
        float leakingRate;
        /**
         * 漏斗剩余空间
         */
        int leftQuota;
        /**
         * 上一次漏水时间
         */
        long leakingTs;

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        /**
         * 计算剩余空间
         */
        void makeSpace() {
            long nowTs = System.currentTimeMillis();
            long deltaTs = nowTs - leakingTs; // 距离上一次漏水过去了多久
            int deltaQuota = (int) (deltaTs * leakingRate); // 腾出多少空间
            if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            if (deltaQuota < 1) { // 腾出空间太小,最小单位是 1
                return;
            }
            this.leftQuota += deltaQuota;
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) {
                this.leftQuota = this.capacity;
            }
        }

        /**
         * @param quota 待申请空间
         * @return 申请是否成功
         */
        boolean watering(int quota) {
            makeSpace();
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }
    }
}
/*
次数1: true
次数2: true
次数3: true
次数4: true
次数5: true
次数6: true
次数7: true
次数8: true
次数9: true
次数10: true
次数11: false
次数12: false
次数13: false
次数14: false
次数15: false
次数16: false
次数17: false
次数18: false
次数19: false
次数20: false
*/

Funnel 对象的 make_space 方法是漏斗算法的核心,其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。

能腾出多少空间取决于过去了多久以及流水的速率。Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量

分布式的漏斗算法该如何实现?能不能使用 Redis 的基础数据结构来搞定?

观察 Funnel 对象的几个字段,发现可以将 Funnel 对象的内容按字段存储到一 个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。

但是有个问题,无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。

而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。

  • 如果重试的话,就会导致性能下降。
  • 如果放弃的话,就会影响用户体验。

同时,代码的复杂度也跟着升高很多。

Redis-Cell——限流 Redis 模块

令牌桶算法_百度百科 (baidu.com)

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并 提供了原子的限流指令。

该模块只有 1 条指令 cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这 个指令具体该如何使用。

图中capacity设置的是总容量-1

上面这个指令的意思是允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速 率),漏斗的初始容量为 16,也就是说一开始可以连续回复 16 个帖子,然后才开始受漏水速率的影响。我们看到这个指令中漏水速率变成了 2 个参数,替代了之前的单个浮点数。用 两个参数相除的结果来表达漏水速率相对单个浮点数要更加直观一些。 

  • > cl.throttle laoqian:reply 15 30 60
  • 1) (integer) 0 # 0 表示允许,1 表示拒绝
  • 2) (integer) 15 # 漏斗容量 capacity
  • 3) (integer) 14 # 漏斗剩余空间 left_quota
  • 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
  • 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周 到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想 阻塞线程,也可以异步定时任务来重试。

package com.hcx.common.redisdemo;

import org.apache.shiro.util.CollectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.List;

@Component
public class FunnelRateLimiter {
    /**
     * lua 脚本
     */
    public static final String LUA_SCRIPT = "return redis.call('cl.throttle',KEYS[1], ARGV[1], ARGV[2], ARGV[3], ARGV[4])";
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * @param key        键值
     * @param capacity   总漏斗容量-1
     * @param operations 漏水速率
     * @param seconds    漏水周期
     * @param quota      待申请空间
     * @return 申请结果
     */
    public boolean isActionAllowed(String key, int capacity, int operations, int seconds, int quota) {
        try {
            DefaultRedisScript<List> script = new DefaultRedisScript<>(LUA_SCRIPT, List.class);
            List<Long> rs = redisTemplate.execute(script, Collections.singletonList(key), capacity, operations, seconds, quota);
            if (CollectionUtils.isEmpty(rs)) return false;

            System.out.println("漏斗容量:" + rs.get(1));
            System.out.println("剩余空间:" + rs.get(2));
            System.out.println("最少多长时间后再试:" + rs.get(3));
            System.out.println("多长时间后漏斗为空:" + rs.get(4));
            return rs.get(0) == 0;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
}
/*
漏斗容量:16
剩余空间:15
最少多长时间后再试:-1
多长时间后漏斗为空:2
结果:true
……
最少多长时间后再试:-1
多长时间后漏斗为空:27
结果:true
漏斗容量:16
剩余空间:1
最少多长时间后再试:-1
多长时间后漏斗为空:29
结果:true
漏斗容量:16
剩余空间:0
最少多长时间后再试:-1
多长时间后漏斗为空:31
结果:true
漏斗容量:16
剩余空间:0
最少多长时间后再试:1
多长时间后漏斗为空:31
结果:false
漏斗容量:16
剩余空间:0
最少多长时间后再试:1
多长时间后漏斗为空:31
结果:false
漏斗容量:16
剩余空间:0
最少多长时间后再试:1
多长时间后漏斗为空:31
结果:false
漏斗容量:16
剩余空间:0
最少多长时间后再试:1
多长时间后漏斗为空:31
结果:false
*/

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周 到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想 阻塞线程,也可以异步定时任务来重试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/8009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Word处理控件Aspose.Words功能演示:使用 Python 合并 Word 文档

当您必须将多个 Word 文档合并到一个文件中时&#xff0c;可能会有多种情况。例如&#xff0c;当多人编写文档的不同部分时&#xff0c;您需要在最后组合内容。另一方面&#xff0c;您可能需要将所有发票合并到一个 Word 文档中。为了以编程方式执行此合并&#xff0c;本文介绍…

码神之路项目总结(四)

一、图片上传 请求接口&#xff1a; 思路&#xff1a; 1、后端用MultipartFile接收前端传过来的文件信息 2、用uuid将文件重命名&#xff0c;然后将文件以新文件名通过七牛云上传到七牛云服务器 二、导航--文章分类 请求接口&#xff1a; 思路&#xff1a; 1、直接从文章…

全球绿色建筑的 10 个最酷的例子

全球变暖和气候变化&#xff0c;是我们日常生活中需要解决的社会问题。从减少塑料的使用到尽量减少汽车的使用&#xff0c;因为它会导致化石燃料的燃烧&#xff0c;每个人都必须具有社会意识&#xff0c;以减轻全球变暖的影响。 在这种情况下&#xff0c;我们为世界十大绿色建…

MyBatis:基础入门

MyBatis基础入门 文章目录MyBatis基础入门一、MyBatis 简介二、MyBatis 工作原理三、MyBatis 与 Hibernate 的对比1. 原生 Jdbc 存在的问题2. MyBatis 与 Hibernate 的对比四、MyBatis demo1. 引入 Maven 依赖2. 创建mybatis-config.xml配置文件3. 编写JavaBean类4. Mapper层编…

Java开发高质量代码建议1:三元操作符的类型务必一致

在Java开发中&#xff0c;三元操作符是 if-else 的简化写法&#xff0c;在项目中使用它的地方很多&#xff0c;也非常好用&#xff0c;但是好用又简单的东西并不表示就可以随便用&#xff0c;如下代码: public class Main {public static void main(String[] args) {int i 90…

ZYNQ之FPGA学习----Vivado功能仿真

1 Vivado功能仿真 阅读本文需先学习: FPGA学习----Vivado软件使用 典型的FPGA设计流程&#xff0c;如图所示&#xff1a; 图片来自《领航者ZYNQ之FPGA开发指南》 Vivado 设计套件内部集成了仿真器 Vivado Simulator&#xff0c;能够在设计流程的不同阶段运行设计的功能仿真和…

搞笑段子很皮的文案系列001,可爱有趣文案系列合集

大家好&#xff0c;我是我赢助手&#xff0c;专注于自媒体短视频去水印、去重和文案提取运营。 今天给大家分享下收集的一些文案&#xff1a;搞笑段子很皮的文案系列 1. 女人之间不用吵架&#xff0c;你比她漂亮就行了。 2. 暧昧&#xff0c;把这两个字拆开&#xff0c;就是真…

docker centos7容器中文乱码问题解决

中文乱码与yum安装失败问题 如下图所示&#xff0c;往文件里输入内容&#xff1a;测试中文乱码问题&#xff0c;结果发现乱码。 甚至如果文件名带有中文也会乱码。 执行locale命令&#xff0c;如下所示。 LANGen_US.UTF-8 LC_CTYPE"en_US.UTF-8" LC_NUMERIC"…

C++:重定义:符号重定义:变量重定义(二):解决变量重定义

C&#xff1a;重定义&#xff1a;符号重定义&#xff1a;变量重定义_hongwen_yul的博客-CSDN博客 上一篇文章中&#xff0c;我们知道解决变量重复定义其中一个办法是&#xff1a;尽量不要头文件中定义变量&#xff0c;头文件只做变量的声明。但是如果我们一定要在头文件中定义…

【2022最新核心面试资料 】最强Java面试八股文秋招offer召唤术!入职薪资53k

前言 我分享的这份秋招 Java 后端开发面试总结包含了 JavaOOP、Java 集合容器、Java 异常、并发编程、Java 反射、Java 序列化、JVM、Redis、Spring MVC、MyBatis、MySQL 数据库、消息中间件 MQ、Dubbo、Linux、ZooKeeper、 分布式 &数据结构与算法等 25 个专题技术点&…

图像生成模型简介

因为DALLE 2是基于CLIP和GLIDE模型写的&#xff0c;作者在写论文的时候弄得十分简略&#xff0c;所以直接看那篇论文可能并不会获得很多信息。今天先帮大家区分一下几个生成模型的区别。 GAN 图片来源&#xff1a;What are Diffusion Models? | LilLog (lilianweng.github.io)…

rsync远程同步

目录 一、rsync简介 1.1 rsync介绍 1.2 rsync同步方式 二、rsync特性 三、rsync与cp、scp对比 四、rsync命令 五、rsync本地复制实例 六、配置源的两种表示方法 七、Rsync 同步源 八、配置 Rsync 下行同步 九、inotify简介 十、配置RsyncInotify 实时同步 十一、r…

【linux kernel】基于ARM64分析linux内核的链接脚本vmlinux.lds.S

文章目录一、导读二、链接器是什么三、链接脚本四、linux内核的链接脚本4-1 头文件包含描述4-2 参数设置和宏定义描述4-3 SECTIONS内容分析五、linux内核的“头”六、总结一、导读 在linux内核中&#xff0c;arch目录下放置的是关于linux内核所支持的具体架构相关的代码描述文…

EFK部署centos7.9(一)ES单节点部署

Elasticsearch部署 系统类型&#xff1a;Centos7.9 节点IP&#xff1a;192.168.11.139 软件版本&#xff1a;jdk-8u121-linux-x64.tar、elasticsearch-6.5.4.tar. 1.首先上传jdk的包 tar xzf jdk-8u121-linux-x64.tar.gz -C /usr/local/ 解压jdk的包 cd /usr/local/ 切…

逻辑回归预测瘀血阻络证||LogRegression 二分类 python3

要求 把数据集分为训练集和测试集使用逻辑回归训练、预测&#xff0c;得出相应的分类指标准确率accuracy&#xff0c;精确率precision&#xff0c;召回率recall&#xff0c;F1-score&#xff0c;并画出最终的ROC曲线&#xff0c;得出AUC值。 数据格式 664条样本 每条103个属性…

列的类型定义——整形类型

文章目录 前言一、整数类型的附带属性 类型名称后面的小括号unsignedauto_increment总结前言 1&#xff09;采用26字母和0-9的自然数加上下互相 ‘_’ 组成&#xff0c;命名简洁明确&#xff0c;多个单词用下划线 ‘_’ 隔开 2&#xff09;全部小写命名&#xff0c;尽量避免出…

猿创征文|计算机专业硕博研究生提高效率的10款科研工具

前言 大家好&#xff0c;我是帝都某高校的一名在读研究生&#xff0c;研究方向为人工智能安全、强化学习、漏洞挖掘。今天想跟各位计算机相关专业的硕士生、博士生们分享几款超级实用并且能够提高科研效率的工具&#xff01;&#xff01;&#xff01;希望能够得到大家的一键四…

【角点检测】 基于各向异性高斯方向导数滤波器实现图像角点检测附matlab代码

1 内容介绍 为了改进噪声鲁棒性和定位准确性,利用各向异性高斯方向导数滤波器,提出多方向角点检测算法.该算法利用一组各向异性高斯方向导数滤波器对输入图像进行卷积处理得到各个方向的滤波器响应.对于每个像素点,利用它与周围邻近像素点的滤波器响应的相关信息构造局部自相关…

Revit导入Cad图元丢失不正确解决和链接CAD功能

一、导入Cad图元丢失或者图元不正确解决&#xff1a; 导入Cad的时候我们会遇到导入图元丢失或者图元不正确等情况&#xff0c;具体解决如下 01.天正画图时一定要导出t3格式&#xff0c;因为Revit只识别t3版本 02.Cad画图时&#xff0c;最后一定要将图元炸开&#xff0c;然后在框…

NVIDIA NCCL 源码学习(五)- 路径计算

上节NCCL完成了对机器PCI系统拓扑的建图&#xff0c;其中建好的图如下所示&#xff0c;其中GPU之间是通过NVLink连接起来的 为了方便之后的搜索channel&#xff0c;接下来NCCL会先计算GPU和NIC节点到其他任意节点之间的最优路径&#xff0c;以及对应的带宽&#xff0c;即最优路…