💖Spring家族源码解析及微服务系列
✨【微服务】Nacos服务发现源码分析
✨【微服务】SpringBoot监听器机制以及在Nacos中的应用
✨【微服务】Nacos客户端微服务注册原理流程
✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理
✨【微服务】SpringBoot启动流程注册FeignClient
✨【微服务】SpringBoot启动流程初始化OpenFeign的入口
✨Spring Bean的生命周期
✨Spring事务原理
✨SpringBoot自动装配原理机制及过程
✨SpringBoot获取处理器流程
✨SpringBoot中处理器映射关系注册流程
✨Spring5.x中Bean初始化流程
✨Spring中Bean定义的注册流程
目录
💖前言
💖通知客户端服务变更以及重试机制
✨流程图
✨Service的updateIPs
✨UdpPushService组件
💫onApplicationEvent()处理事件
💫UDP推送客户端
✨UDP推送ACK
💫重试任务
✨取消重试
💫初始化Receiver并启动一个线程
💫Receiver接收器
💖总结
💖前言
本篇文章将讲解如下内容:服务端是如何通知客户端服务变更的、使用了什么协议、推送失败如何处理、推送给谁?如果需要重试,怎么判断的,如何避免多次重试?
💖通知客户端服务变更以及重试机制
✨流程图
✨Service的updateIPs
Service的逻辑在之前已经详细解析,这里只是提一下。重点看下面逻辑:
✨UdpPushService组件
这里在Spring的事件机制中已经详细分析,感兴趣的读者可以回去翻看一下。
💫onApplicationEvent()处理事件
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
// If upgrade to 2.0.X, do not push for v1.如果升级到2.0.X,不要推动 v1。
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
// 从事件获取服务
Service service = event.getService();
// 服务名
String serviceName = service.getName();
// 命名空间
String namespaceId = service.getNamespaceId();
//merge some change events to reduce the push frequency:
// 合并一些更改事件以减少推送频率:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
return;
}
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
// 服务已变更,把它添加到推送队列
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
// nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,
// clientMap中存放的就是这些udp客户端信息
ConcurrentMap<String, PushClient> clients = subscriberServiceV1.getClientMap()
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client);
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client);
continue;
}
AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client);
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
// switchDomain.getDefaultPushCacheMillis()默认是10秒,
// 即10000毫秒,不会进入这个分支,所以compressData = null
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
// 组装ack数据
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key,
new org.javatuples.Pair<>(ackEntry.getOrigin().getData(), ackEntry.getData()));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.getKey()));
// UDP推送客户端
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
// 合并一些更改事件以减少推送频率,上面代码判断
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
主要逻辑:
- 从事件获取服务,然后获取服务名、命名空间
- 更改事件是否已经合并,已经推送不再推送
- 获取订阅客户端,没有订阅者则return。nacos服务端给每个客户端实例推送udp包时,该实例就是一个udp客户端,clientMap中存放的就是这些udp客户端信息
- 组装ack数据,UDP通知订阅的客户端
- 合并一些更改事件以减少推送频率,上面代码判断
💫UDP推送客户端
private static AckEntry udpPush(AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
// 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
if (ackEntry.getRetryTimes() > Constants.UDP_MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.getRetryTimes(),
ackEntry.getKey());
// 清理数据
ackMap.remove(ackEntry.getKey());
udpSendTimeMap.remove(ackEntry.getKey());
MetricsMonitor.incrementFailPush();
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.getKey())) {
MetricsMonitor.incrementPush();
}
// 填充数据预备重试
ackMap.put(ackEntry.getKey(), ackEntry);
udpSendTimeMap.put(ackEntry.getKey(), System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.getKey());
// UDP推送客户端服务发生变更
udpSocket.send(ackEntry.getOrigin());
// 重试次数原子加1,以防多次推送
ackEntry.increaseRetryTime();
// 10秒内未接收到客户端UDP推送ack则重试
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(Constants.ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.getData(),
ackEntry.getOrigin().getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.getKey());
udpSendTimeMap.remove(ackEntry.getKey());
MetricsMonitor.incrementFailPush();
return null;
}
}
主要逻辑:
- 初始化是0,如果重试次数大于MAX_RETRY_TIMES=1次,就不再发送udp包了
- ackMap填充数据预备重试
- UDP推送通知客户端服务发生变更
- 重试次数原子加1,以防多次推送
- 启动延迟任务,10秒内未接收到客户端UDP推送ack则重试
✨UDP推送ACK
这里是client模块的PushReceiver#run()逻辑,本质上就是我们微服务自己,也就是客户端。这里只是提取一段重要逻辑要保持连贯性,上一篇已具体分析。
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 监听Nacos服务端服务实例信息变更后的通知
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if (PUSH_PACKAGE_TYPE_DOM.equals(pushPacket.type) || PUSH_PACKAGE_TYPE_SERVICE.equals(pushPacket.type)) {
serviceInfoHolder.processServiceInfo(pushPacket.data);
// send ack to server发送ack到服务器
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if (PUSH_PACKAGE_TYPE_DUMP.equals(pushPacket.type)) {
// dump data to server将数据转储到服务器
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(serviceInfoHolder.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only仅仅发送ack
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
// 发送ack到服务端
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
主要逻辑:
- 监听Nacos服务端服务实例信息变更后的通知
- 解析注册中心推送的结果,组装回调ack报文,将注册中心推送的变更服务信息缓存到本地serviceInfoMap
- UDP推送ack到注册中心,以便注册中心决定是否需要重试。
💫重试任务
public static class Retransmitter implements Runnable {
AckEntry ackEntry;
public Retransmitter(AckEntry ackEntry) {
this.ackEntry = ackEntry;
}
@Override
public void run() {
// 重试推送
if (ackMap.containsKey(ackEntry.getKey())) {
Loggers.PUSH.info("retry to push data, key: " + ackEntry.getKey());
udpPush(ackEntry);
}
}
}
这里逻辑很简单,如果在10秒收到来自客户端的ACK通知,那么这里的ackMap就匹配不到ackKey,就不会重试了;否则重试UDP推送客户端。
✨取消重试
💫初始化Receiver并启动一个线程
static {
try {
// 初始化套接字
udpSocket = new DatagramSocket();
// 初始化接收客户端UDP通知的UDP接收器
Receiver receiver = new Receiver();
// 启动一个线程
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
} catch (SocketException e) {
Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}
UdpPushService在构造初始化时就初始化接收客户端UDP通知的UDP接收器了。Receiver 实现了Runnable接口,所以启动一个线程执行该任务。任务逻辑如下面所示:
💫Receiver接收器
public static class Receiver implements Runnable {
@Override
public void run() {
while (true) {
byte[] buffer = new byte[1024 * 64];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {
// 监听客户端ACK确认
udpSocket.receive(packet);
String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();
AckPacket ackPacket = JacksonUtils.toObj(json, AckPacket.class);
InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
String ip = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
// 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒
// ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L)
if (System.nanoTime() - ackPacket.lastRefTime > Constants.ACK_TIMEOUT_NANOS) {
Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);
}
String ackKey = AckEntry.getAckKey(ip, port, ackPacket.lastRefTime);
// 返回的是删除健的值
// 这一删除就意味着不需要重试了即Retransmitter的run()中匹配不到该key
// 如果超过10秒客户端未UDP推送ack还是可能重试的
AckEntry ackEntry = ackMap.remove(ackKey);
if (ackEntry == null) {
throw new IllegalStateException(
"unable to find ackEntry for key: " + ackKey + ", ack json: " + json);
}
// 每个数据包的耗时
long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
Loggers.PUSH
.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", json, ip,
port, pushCost, ackMap.size(), MetricsMonitor.getTotalPushMonitor().get());
MetricsMonitor.incrementPushCost(pushCost);
udpSendTimeMap.remove(ackKey);
} catch (Throwable e) {
Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
}
}
}
主要逻辑:
- 监听客户端ACK确认
- 接受到ACK响应的时间距离上次接受到的时间之差如果大于10秒,打印警告
- 拼接ackKey,然后根据ackKey删除健的值。这一删除就意味着不需要重试了即Retransmitter的run()中匹配不到该ackKey,如果超过10秒客户端未UDP推送ack还是可能重试的
- 每个数据包的耗时,打印接收到ack日志,清理数据等
💖总结
采用UDP通知客户端,客户端使用PushReceiver监听服务端UDP通知,监听到则缓存到本地,组装ACK数据UDP通知服务端;启动延迟任务,10秒没有接收到ACK则重试;Receiver接收器监听客户端ACK推送,接收到则移除ackKey及其value。