Redis:Feed流、ZSet点赞排序+滚动分页+滑动窗口限流
目录一、ZSet点赞模块1. 点赞功能实现2. 按照点赞时间将点赞人排序3.定时任务更新点赞量4.总结二、Feed流1.Feed流实现方案1.1 拉模式读扩散1.2 推模式写扩散1.3 读写混合2.推模式实现将用户发布的动态推送给粉丝3.SortedSet滚动分页查询4.总结三、ZSet滑动窗口限流一、ZSet点赞模块1. 点赞功能实现点赞功能也要同时写数据库和写缓存但写的内容不一致所以不属于双写一致性问题。TransactionalpublicResultlikeBlog(Longid){StringuserIdUserHolder.getUser().getId().toString();// 用户是否点赞了DoublescorestringRedisTemplate.opsForZSet().score(like:id,userId);if(scorenull){// 这里直接用null来判断返回值null或空对象区别在于是否分配内存地址String\List\Map判断空对象就要用isEmpty()了stringRedisTemplate.opsForZSet().add(like:id,userId,System.currentTimeMillis());// 用当前ms时间作为排序值update().setSql(liked liked 1).eq(id,id).update();}else{stringRedisTemplate.opsForZSet().remove(like:id,userId);update().setSql(liked liked - 1).eq(id,id).update();}returnResult.ok();}2. 按照点赞时间将点赞人排序publicResultqueryBlogLikes(Longid){// 点赞顺序前5的用户id集合SetStringuserIdsstringRedisTemplate.opsForZSet().range(like:id,0,4);if(userIdsnull||userIds.isEmpty()){// 没有点赞信息returnResult.ok(Collections.emptyList());}/** 查询用户 select * from tb_user where id in (#{userIds}) order by field (id, #{userIds}) * order by field (id, #{userIds}) 是因为in查询的返回结果是按照用户id排序的而不是按照userIds排序导致返回给前端的用户顺序错乱 * userIds.stream().map(Long::valueOf).collect(Collectors.toList())是将String类型的List转为Long类型的List * StrUtil.join(,,userIds)是将userIds拼接成字符串以逗号分隔 */ListUserusersuserService.query().in(id,userIds.stream().map(Long::valueOf).collect(Collectors.toList())).last(ORDER BY FIELD(id,StrUtil.join(,,userIds))).list();ListUserDTOuserDTOSnewArrayList();//为了防止用户信息泄露所以只返回给前端用户基础信息users.forEach(user-{UserDTOuserDTOnewUserDTO();BeanUtil.copyProperties(user,userDTO,true);userDTOS.add(userDTO);});returnResult.ok(userDTOS);}3.定时任务更新点赞量ComponentpublicclassLikeReconciliationTask{AutowiredprivateStringRedisTemplate stringRedisTemplate;AutowiredprivateBlogMapper blogMapper;AutowiredprivateLikeScheduledTaskManager taskManager;// 自定义定时任务线程池privateScheduledExecutorService scheduledExecutorService;PostConstruct// 项目启动时执行publicvoidstartReconciliationTask(){// 实例化任务线程池scheduledExecutorServicenewScheduledThreadPoolExecutor(3,Executors.defaultThreadFactory(),newThreadPoolExecutor.AbortPolicy());// 步骤1计算从当前时间到「凌晨2点」的初始延迟longinitialDelaycalculateDelayTo2AM();// 步骤2每天24小时执行一次对账任务taskManager.getScheduledExecutorService().scheduleAtFixedRate(this::doReconciliation,initialDelay,24,TimeUnit.HOURS);}// 计算当前时间到次日凌晨2点的延迟单位秒privatelongcalculateDelayTo2AM(){longnowSystem.currentTimeMillis();// 今天凌晨2点的时间戳longtoday2AMgetToday2AMTimestamp();// 如果当前时间已过凌晨2点取次日2点if(nowtoday2AM){today2AM24*60*60*1000L;}// 转换为秒ScheduledExecutorService 支持毫秒这里也可以直接返回毫秒return(today2AM-now)/1000;}// 获取今天凌晨2点的时间戳privatelonggetToday2AMTimestamp(){java.util.Calendar caljava.util.Calendar.getInstance();cal.set(java.util.Calendar.HOUR_OF_DAY,2);cal.set(java.util.Calendar.MINUTE,0);cal.set(java.util.Calendar.SECOND,0);cal.set(java.util.Calendar.MILLISECOND,0);returncal.getTimeInMillis();}// 核心对账逻辑privatevoiddoReconciliation(){ListLongblogIdsblogMapper.listAllBlogIds();for(Long blogId:blogIds){try{String likeKeylike:blogId;Long redisLikeCountstringRedisTemplate.opsForZSet().zCard(likeKey);if(redisLikeCountnull){redisLikeCount0L;}Blog blogblogMapper.selectById(blogId);Integer dbLikeCountblog.getLiked()null?0:blog.getLiked();if(!redisLikeCount.equals(dbLikeCount.longValue())){log.warn(博客{}点赞数不一致Redis{}数据库{}开始修正,blogId,redisLikeCount,dbLikeCount);blogMapper.update().set(liked,redisLikeCount).eq(id,blogId).update();}}catch(Exception e){log.error(对账博客{}点赞数失败,blogId,e);}}}}4.总结点赞功能SortedSet进行点赞以及点赞排序点赞结果先更新redis后更新数据库并采用scheduledExecutorService凌晨两点遍历数据库的笔记表并查询redis对应记录不一致则更新数据库。二、Feed流Feed流也叫关注推送为用户持续推送消息的一种方式。Feed流常见有两种实现模式Timeline不做内容筛选推送内容按照内容发布时间排序例如微信朋友圈。智能排序利用推荐算法推送用户感兴趣的内容例如抖音。1.Feed流实现方案在redis中可以为用户创建收件箱和发件箱收件箱用来接收消息推送给用户的消息保存在用户的收件箱中。发件箱用户自己的消息保存在该用户的发件箱中用于发送消息。1.1 拉模式读扩散每个用户只需维护发件箱用户A发消息时服务器都会将该消息保存到用户A的发件箱中。用户B想要查看消息时会从用户A的发件箱中获取消息。优点每条消息只存一份节省redis内存空间缺点读消息要查发件箱延迟高1.2 推模式写扩散每个用户只需维护收件箱用户A发消息时服务器都会将该消息推送到用户B的收件箱中。用户B想要查看消息时只需要从自己的收件箱中获取消息。优点读消息速度快延迟低缺点每条消息保存n份非常占用redis内存空间1.3 读写混合读写混合结合了推模式和拉模式的优点。每个用户需维护发件箱和收件箱。用户A发消息时若接收该消息的用户太多好友太多或粉丝太多那么服务器都会将该消息保存到用户A的发件箱中。更进一步设计若用户A是用户B的特别关注那么将消息保存到用户B的收件箱中。若用户C是用户A的僵尸粉或用户C不经常查看消息那么将消息保存到用户A的发件箱中。若接收该消息的用户很少那么服务器都会将该消息保存到其他用户的收件箱中。用户B想要查看消息时先查看自己的收件箱再查看其他用户的发件箱。具体可以灵活设计例如如果用户是活跃用户那么可以把消息推到他的收件箱中反之不经常刷抖音的用户没必要给他开辟收件箱他想刷视频那么就去其他用户的发件箱中获取视频。新用户的话不能亏待他就用推模式等用久了再杀熟。此外如果用户经常发高质量视频那么完全可以用推模式直接将通知推送到其他用户的手机中反之低质量视频就不用推拉模式节省内存就够了。2.推模式实现将用户发布的动态推送给粉丝Override// 发布动态publicResultsaveBlog(Blogblog){// 获取登录用户UserDTOuserUserHolder.getUser();blog.setUserId(user.getId());// 保存动态save(blog);// 获取粉丝ListFollowfollowsfollowService.query().eq(follow_user_id,user.getId()).list();if(follows!null!follows.isEmpty()){// 获取粉丝idfollows.forEach(follow-{// 推送当前动态的id到所有粉丝的redis收件箱中stringRedisTemplate.opsForZSet().add(receive:follow.getUserId(),blog.getId().toString(),System.currentTimeMillis());});}// 返回idreturnResult.ok(blog.getId());}3.SortedSet滚动分页查询由于SortedSet是有序的当按照插入时间排序时当数据有序时若按照下标分页会出现如下问题t1时刻读取时间最新的5条数据则第一页数据为6~10。t2时刻插入了一条数据11此时t3时刻会读取第二页数据正常应该读取1~5但是由于中途插入数据导致下标发生变化导致数据下标混乱本质会重复的读数据6。如下图所示滚动分页的思想是记录t1时刻读的最后一条数据6t2时刻从6号数据的下一个数据即5号数据开始读。如何记录每次查询的最后一条数据可以按照元素进入集合的先后顺序给每个元素编号元素的编号给定后就固定不变每次记录读的最后一个数据的编号x而非下标然后给定偏移量offset为与x编号相同的元素个数那么下一次从x-offset开始读。Override// lastId:上一页的最小时间戳也就是这一页的最大时间戳开区间;offset:偏移量publicResultofFollow(LonglastId,Integeroffset){// 按照score范围查询查score范围在[0,lastId)即[0,lastId-1]中score最大的3条数据SetZSetOperations.TypedTupleStringtypedTuplesstringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(receive:UserHolder.getUser().getId().toString(),0,lastId,offset,3);if(typedTuplesnull||typedTuples.isEmpty()){returnResult.ok();//关注的用户没有发布动态}// 获取所有动态的blogId以及该页的最小时间戳lastId偏移量offset// blogId用于页面显示lastId和offset作为下一页取数据的依据ListLongblogIdsnewArrayList(typedTuples.size());longlastIdNextPage0;intoffsetNextPage1;for(ZSetOperations.TypedTupleStringtuple:typedTuples){// blogIdblogIds.add(Long.valueOf(tuple.getValue()));// offsetNextPage为与lastIdNextPage相同的元素个数其中lastIdNextPage为该页的最小编号时间其实重复的可能性很小longtimetuple.getScore().longValue();if(timelastIdNextPage){offsetNextPage;}else{offsetNextPage1;}// 记录该页的最小编号时间其实重复的可能性很小lastIdNextPagetime;}// 根据blogIds查对应的动态信息ListBlogblogsquery().in(id,blogIds).last(ORDER BY FIELD(id,StrUtil.join(,,blogIds))).list();returnResult.ok(newScrollResult(blogs,lastIdNextPage,offsetNextPage));}其实可以简单抽象为一个算法问题给定一个包含数字1-10的数组数组元素有序排列从数组末尾向前遍历规定每一轮输出3个元素不同轮次之间输出的元素不能有重复轮次内的元素可以重复输出各轮次的元素。4.总结粉丝推送功能Set记录粉丝集合foreach List将笔记ID保存到粉丝的SortedSet中。粉丝点击关注列表时直接查询SortedSet中的笔记并返回。三、ZSet滑动窗口限流以QPS100为例每个请求到达时首先记录当前时间t统计以t为结尾的前1s的请求数t, zremrangebyscore key, t-1000, t删除1s外的请求得到当前窗口的请求数zcard key。zremrangebyscore时间复杂度O(mlog n)二分/skiplist查找窗口边界删除边界外的m个元素如果前1s的请求数小于100那么放行并zadd key t trandom将当前请求加入集合否则直接return拒绝当前请求的后续业务。上述流程能确保每个请求都是以自身t构造滑动窗口判断是否限流而非静态窗口。为了避免并发问题要使用lua脚本-- 限流核心参数QPS阈值100、时间窗口大小1000ms1秒locallimit100localwindow1000-- 1. 获取当前时间戳毫秒级保证精度localnowtonumber(redis.call(TIME)[1])*1000tonumber(redis.call(TIME)[2])/1000-- 2. 清理窗口外的过期请求删除 [当前时间-1000ms, 当前时间] 之外的请求-- KEYS[1] 限流的Redis Key如rate_limit:apiredis.call(ZREMRANGEBYSCORE,KEYS[1],0,now-window)-- 3. 统计当前1秒窗口内的请求总数localcounttonumber(redis.call(ZCARD,KEYS[1]))-- 4. 限流判断未超过阈值则放行超过则拒绝ifcountlimitthen-- 放行将当前请求加入有序集合score当前时间戳value时间戳随机数避免重复-- 随机数防止同一毫秒多个请求value重复被覆盖localrandommath.random(1,1000)redis.call(ZADD,KEYS[1],now,now.._..random)-- 设置过期时间自动清理闲置key节省内存2秒足够覆盖窗口redis.call(EXPIRE,KEYS[1],2)-- 返回1代表请求放行return1else-- 返回0代表请求被限流拒绝return0end
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2425448.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!