【微服务】Nacos通知客户端服务变更以及重试机制

news2025/7/18 2:39:06

💖Spring家族源码解析及微服务系列

✨【微服务】Nacos服务发现源码分析

✨【微服务】SpringBoot监听器机制以及在Nacos中的应用

✨【微服务】Nacos客户端微服务注册原理流程

✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理

✨【微服务】SpringBoot启动流程注册FeignClient

✨【微服务】SpringBoot启动流程初始化OpenFeign的入口

✨Spring Bean的生命周期

✨Spring事务原理

✨SpringBoot自动装配原理机制及过程

✨SpringBoot获取处理器流程

✨SpringBoot中处理器映射关系注册流程

✨Spring5.x中Bean初始化流程

✨Spring中Bean定义的注册流程

目录

💖前言

💖通知客户端服务变更以及重试机制

✨流程图

✨Service的updateIPs

✨UdpPushService组件

💫onApplicationEvent()处理事件

💫UDP推送客户端

✨UDP推送ACK

💫重试任务

✨取消重试

💫初始化Receiver并启动一个线程

💫Receiver接收器

💖总结


💖前言

    本篇文章将讲解如下内容:服务端是如何通知客户端服务变更的、使用了什么协议、推送失败如何处理、推送给谁?如果需要重试,怎么判断的,如何避免多次重试?

💖通知客户端服务变更以及重试机制

✨流程图

 

✨Service的updateIPs

Service的逻辑在之前已经详细解析,这里只是提一下。重点看下面逻辑:

UdpPushService组件

这里在Spring的事件机制中已经详细分析,感兴趣的读者可以回去翻看一下。

💫onApplicationEvent()处理事件

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        // If upgrade to 2.0.X, do not push for v1.如果升级到2.0.X,不要推动 v1。
        if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
            return;
        }
        // 从事件获取服务
        Service service = event.getService();
        // 服务名
        String serviceName = service.getName();
        // 命名空间
        String namespaceId = service.getNamespaceId();
        //merge some change events to reduce the push frequency:
        // 合并一些更改事件以减少推送频率:
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
            return;
        }
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                // 服务已变更,把它添加到推送队列
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                // nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,
                // clientMap中存放的就是这些udp客户端信息
                ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }
                
                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client);
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client);
                        continue;
                    }
                    
                    AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client);
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    // switchDomain.getDefaultPushCacheMillis()默认是10秒,
                    // 即10000毫秒,不会进入这个分支,所以compressData = null
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();
                        
                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }
                    // 组装ack数据
                    if (compressData != null) {
                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key,
                                    new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));
                        }
                    }
                    
                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.getKey()));
                    // UDP推送客户端
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
                
            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }
            
        }, 1000, TimeUnit.MILLISECONDS);
        // 合并一些更改事件以减少推送频率,上面代码判断
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
        
    }

主要逻辑:

  1. 从事件获取服务,然后获取服务名、命名空间
  2. 更改事件是否已经合并,已经推送不再推送
  3. 获取订阅客户端,没有订阅者则return。nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,clientMap中存放的就是这些udp客户端信息
  4. 组装ack数据,UDP通知订阅的客户端
  5. 合并一些更改事件以减少推送频率,上面代码判断

💫UDP推送客户端

    private static AckEntry udpPush(AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        // 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
        if (ackEntry.getRetryTimes() > Constants.UDP_MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.getRetryTimes(),
                    ackEntry.getKey());
            // 清理数据
            ackMap.remove(ackEntry.getKey());
            udpSendTimeMap.remove(ackEntry.getKey());
            MetricsMonitor.incrementFailPush();
            return ackEntry;
        }
        
        try {
            if (!ackMap.containsKey(ackEntry.getKey())) {
                MetricsMonitor.incrementPush();
            }
            // 填充数据预备重试
            ackMap.put(ackEntry.getKey(), ackEntry);
            udpSendTimeMap.put(ackEntry.getKey(), System.currentTimeMillis());
            
            Loggers.PUSH.info("send udp packet: " + ackEntry.getKey());
            // UDP推送客户端服务发生变更
            udpSocket.send(ackEntry.getOrigin());

            // 重试次数原子加1,以防多次推送
            ackEntry.increaseRetryTime();
            // 10秒内未接收到客户端UDP推送ack则重试
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                    TimeUnit.NANOSECONDS.toMillis(Constants.ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.getData(),
                    ackEntry.getOrigin().getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.getKey());
            udpSendTimeMap.remove(ackEntry.getKey());
            MetricsMonitor.incrementFailPush();
            
            return null;
        }
    }

主要逻辑:

  1. 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
  2. ackMap填充数据预备重试
  3. UDP推送通知客户端服务发生变更
  4. 重试次数原子加1,以防多次推送
  5. 启动延迟任务10秒内未接收到客户端UDP推送ack重试

✨UDP推送ACK

    这里是client模块的PushReceiver#run()逻辑,本质上就是我们微服务自己,也就是客户端。这里只是提取一段重要逻辑要保持连贯性,上一篇已具体分析。

    @Override
    public void run() {
        while (!closed) {
            try {
                
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                // 监听Nacos服务端服务实例信息变更后的通知
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
                    serviceInfoHolder.processServiceInfo(pushPacket.data);
                    
                    // send ack to server发送ack到服务器
                    ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"\"}";
                } else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
                    // dump data to server将数据转储到服务器
                    ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
                            + "\"}";
                } else {
                    // do nothing send ack only仅仅发送ack
                    ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                            + "\", \"data\":" + "\"\"}";
                }
                // 发送ack到服务端
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

主要逻辑:

  1. 监听Nacos服务端服务实例信息变更后的通知
  2. 解析注册中心推送的结果,组装回调ack报文,将注册中心推送的变更服务信息缓存到本地serviceInfoMap
  3. UDP推送ack到注册中心,以便注册中心决定是否需要重试

💫重试任务

    public static class Retransmitter implements Runnable {
        
        AckEntry ackEntry;
        
        public Retransmitter(AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }
        
        @Override
        public void run() {
            // 重试推送
            if (ackMap.containsKey(ackEntry.getKey())) {
                Loggers.PUSH.info("retry to push data, key: " + ackEntry.getKey());
                udpPush(ackEntry);
            }
        }
    }

    这里逻辑很简单,如果在10秒收到来自客户端的ACK通知,那么这里的ackMap就匹配不到ackKey,就不会重试了;否则重试UDP推送客户端。

✨取消重试

💫初始化Receiver并启动一个线程

    static {
        try {
            // 初始化套接字
            udpSocket = new DatagramSocket();
            // 初始化接收客户端UDP通知的UDP接收器
            Receiver receiver = new Receiver();
            // 启动一个线程
            Thread inThread = new Thread(receiver);
            inThread.setDaemon(true);
            inThread.setName("com.alibaba.nacos.naming.push.receiver");
            inThread.start();
            
        } catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }

    UdpPushService在构造初始化时就初始化接收客户端UDP通知的UDP接收器了。Receiver 实现了Runnable接口,所以启动一个线程执行该任务。任务逻辑如下面所示:

💫Receiver接收器

public static class Receiver implements Runnable {
        
        @Override
        public void run() {
            while (true) {
                byte[] buffer = new byte[1024 * 64];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                
                try {
                    // 监听客户端ACK确认
                    udpSocket.receive(packet);
                    
                    String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();
                    AckPacket ackPacket = JacksonUtils.toObj(json, AckPacket.class);
                    
                    InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
                    String ip = socketAddress.getAddress().getHostAddress();
                    int port = socketAddress.getPort();
                    // 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒
                    // ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L)
                    if (System.nanoTime() - ackPacket.lastRefTime > Constants.ACK_TIMEOUT_NANOS) {
                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);
                    }
                    
                    String ackKey = AckEntry.getAckKey(ip, port, ackPacket.lastRefTime);
                    // 返回的是删除健的值
                    // 这一删除就意味着不需要重试了即Retransmitter的run()中匹配不到该key
                    // 如果超过10秒客户端未UDP推送ack还是可能重试的
                    AckEntry ackEntry = ackMap.remove(ackKey);
                    if (ackEntry == null) {
                        throw new IllegalStateException(
                                "unable to find ackEntry for key: " + ackKey + ", ack json: " + json);
                    }

                    // 每个数据包的耗时
                    long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
                    
                    Loggers.PUSH
                            .info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", json, ip,
                                    port, pushCost, ackMap.size(), MetricsMonitor.getTotalPushMonitor().get());
                    
                    MetricsMonitor.incrementPushCost(pushCost);
                    
                    udpSendTimeMap.remove(ackKey);
                    
                } catch (Throwable e) {
                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
                }
            }
        }

主要逻辑:

  1. 监听客户端ACK确认
  2. 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒,打印警告
  3. 拼接ackKey,然后根据ackKey删除健的值。这一删除就意味着不需要重试了即Retransmitterrun()中匹配不到该ackKey,如果超过10秒客户端未UDP推送ack还是可能重试的
  4. 每个数据包的耗时,打印接收到ack日志,清理数据等

💖总结

采用UDP通知客户端,客户端使用PushReceiver监听服务端UDP通知,监听到则缓存到本地,组装ACK数据UDP通知服务端;启动延迟任务,10秒没有接收到ACK则重试;Receiver接收器监听客户端ACK推送,接收到则移除ackKey及其value。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/5745.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字节一面后,我又看了一遍ThreadLocal核心原理

前言&#xff1a;上周在面试字节的时候&#xff0c;问到了ThreadLocal的核心原理&#xff0c;由于这个知识点当时有些淡忘&#xff0c;因此作此篇文章进行知识的记录&#xff0c;同时希望能够帮助到其他的小伙伴儿们。 本篇文章记录的基础知识&#xff0c;适合在学Java的小白&a…

动态 SQL

文章目录一、学习目的二、动态 SQL 中的元素三、条件查询操作四、更新操作五、复杂查询操作1.foreach 元素中的属性2.foreach 元素迭代数组3.foreach 元素迭代 List4.foreach 元素迭代 Map一、学习目的 在实际项目的开发中&#xff0c;开发人员在使用 JDBC 或其他持久层框架进…

【汇编 C++】多态底层---虚表、__vfptr指针

前言&#xff1a;如果对多态不太了解的话&#xff0c;可以看我的这篇文章《C多态》&#xff0c;另外本文中出现到的汇编代码&#xff0c;我都会予以解释&#xff0c;看不懂没关系&#xff0c;知道大概意思就行&#xff0c;能不讲汇编的地方我就不讲&#xff1b; 本文使用到的工…

networkx学习记录

networkx学习记录networkx学习记录1. 创建图表2. 节点3. 边4.检查图的元素5.从图中删除元素6.使用图构造函数7.访问边和邻居8.向图、节点和边添加属性9.有向图10. 绘制图形networkx学习记录 1. 创建图表 创建一个空图 import networkx as nx G nx.Graph()此时如果报以下错误…

HTML网页设计结课作业——11张精美网页 html+css+javascript+bootstarp

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 精彩专栏推荐&#x1f4…

学姐突然问我键盘怎么选?原来是为了这个...

前言&#xff1a; 上个星期学姐来问我该买啥键盘&#xff0c;说是自己用的笔记本的键盘实在是不太好用&#xff0c;很喜欢机械键盘的手感&#xff0c;但是常规的机械键盘有太大了而且声音十分大&#xff0c;对她们女生来说并不是很友好。于是我给她推荐了我现在正在用的这款键盘…

头歌-信息安全技术-Java生成验证码

头歌-信息安全技术-Java生成验证码一、第1关&#xff1a;使用Servlet生成验证码1、任务描述2、编程要求3、评测代码二、第2关&#xff1a;用户登录时校验验证码是否正确1、任务描述2、编程要求3、评测代码三、第3关&#xff1a;使用Kaptcha组件生成验证码1、任务描述2、编程要求…

2023年前端开发未来可期

☆ 对于很多质疑&#xff0c;很多不解&#xff0c;本文将从 △ 目前企业内前端开发职业的占比&#xff1b; △ 目前业内开发语言的受欢迎程度&#xff1b; △ 近期社区问答活跃度&#xff1b; 等维度来说明目前前端这个职业的所处位置。 ☆ 还有强硬的干货&#xff0c;通过深入…

跳槽前恶补面试题,成功上岸阿里,拿到33k的测开offer

不知不觉间&#xff0c;时间过得真快啊。作为一名程序员&#xff0c;应该都清楚每年的3、4月份和9、10月份都是跳槽的黄金季&#xff0c;各大企业在这段时间会大量招聘人才。在这段时间里&#xff0c;有人欢喜有人悲。想必各位在跳槽前都会做好充足的准备&#xff0c;同样做足了…

详细讲解网络协议:TCP和UDP什么区别?

该文章是学习了 B 站 up 主的视频做的总结&#xff0c;讲的很通俗易懂&#xff0c;首先感谢博主的分享。视频地址&#xff1a;https://www.bilibili.com/video/BV1kV411j7hA/?spm_id_from333.337.search-card.all.click&vd_source0a3d4c746a63d737330e738fa043eaf6 重新认…

【HDU No. 3567】八数码 II Eight II

【HDU No. 3567】八数码 II Eight II 杭电OJ 题目地址 【题意】 八数码&#xff0c;也叫作“九宫格”&#xff0c;来自一个古老的游戏。在这个游戏中&#xff0c;你将得到一个33的棋盘和8个方块。方块的编号为1&#xff5e;8&#xff0c;其中一块方块丢失&#xff0c;称之为“…

【python】基础复习

注&#xff1a;最后有面试挑战&#xff0c;看看自己掌握了吗 文章目录python的应用基础语法编码标识符python保留字第一个注释多行语句数字(Number)类型字符串(String)print 默认输出是换行的&#xff0c;如果要实现不换行需要在变量末尾加上 end""&#xff1a;impor…

猿创征文|在校大学生学习UI设计必备工具及日常生活中使用的软件

嗨&#xff0c;大家好&#xff0c;我是异星球的小怪同志 一个想法有点乱七八糟的小怪 如果觉得对你有帮助&#xff0c;请支持一波。 希望未来可以一起学习交流。 我是一名在校大二的学生&#xff0c;目前在学习关于UI设计方向的一些课程&#xff0c;平时会用到UI设计必备的工…

我终于读懂了适配器模式。。。

文章目录&#x1f5fe;&#x1f306;什么是适配器模式&#xff1f;&#x1f3ef;类适配器模式&#x1f3f0;对象适配器模式⛺️接口适配器模式&#x1f3ed;适配器模式在SpringMVC 框架应用的源码剖析&#x1f5fc;适配器模式的注意事项和细节&#x1f306;什么是适配器模式&am…

基于SDN环境下的DDoS异常攻击的检测与缓解--实验

基于SDN环境下的DDoS异常攻击的检测与缓解--实验基于SDN环境下的DDoS异常攻击的检测与缓解--实验1.安装floodlight2.安装sFlow-RT流量监控设备3.命令行安装curl工具4.构建拓扑5.DDoS 攻击检测6.DDoS 攻击防御7.总结申明&#xff1a; 未经许可&#xff0c;禁止以任何形式转载&am…

PNG怎么转换成PDF?这篇文章教会你

有时候我们需要查找一些图片资料并将它打印出来&#xff0c;但是在网上的图片大多是以PNG格式存在的&#xff0c;这个时候&#xff0c;我们就需要先利用一些转换软件把PNG转换成PDF文件的格式&#xff0c;从而方便我们进行打印。那么你们知道PNG转PDF怎么转换吗&#xff1f;今天…

第四章:前缀和、差分(数列)

前缀和差分一、前缀和1、 什么是前缀和2、 前缀和的作用3、 前缀和的例题和模板&#xff08;1&#xff09;一维数组的前缀和C版C版&#xff08;2&#xff09;二维数组的前缀和a.思路&#xff1a;b.题目和模板&#xff1a;C版C版二、差分1、什么是差分&#xff1f;2、差分有什么…

FFplay文档解读-43-视频过滤器十八

29.170 telecine 将电视电影处理应用于视频。 此过滤器接受以下选项&#xff1a; first_field选项解释top, ttop field firstbottom, b底部字段优先默认值为top pattern一串数字&#xff0c;表示希望应用的下拉模式。 默认值为23。 Some typical patterns:NTSC output (30i…

传统纸业如何实现数字化,S2B2C系统网站赋能渠道提升供应链管理效率

一千多年前&#xff0c;我们老祖宗发明了造纸术&#xff0c;纸张成为方便、廉价的信息载体&#xff0c;由此影响了中国乃至世界文明的进程。如今&#xff0c;随着信息技术的普及&#xff0c;纸张作为信息载体的功能日益弱化&#xff0c;但作为一种环保材料将会更广泛地融入我们…

通过宠物商店理解java面向对象

前言&#xff1a;本篇博客&#xff0c;适合刚刚学完java基础语法的但是&#xff0c;对于面向对象&#xff0c;理解不够深刻的读者&#xff0c;本文通过经典的宠物商店&#xff0c;来让读者深刻的理解&#xff0c;面向对象&#xff0c;IS-A&#xff0c;HAS-A法则。本文不仅仅是简…