💖Spring家族及微服务系列文章
✨【微服务】Nacos通知客户端服务变更以及重试机制
【微服务】SpringBoot监听器机制以及在Nacos中的应用✨【微服务】Nacos服务端完成微服务注册以及健康检查流程
✨【微服务】Nacos客户端微服务注册原理流程
✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理
✨【微服务】SpringBoot启动流程注册FeignClient
✨【微服务】SpringBoot启动流程初始化OpenFeign的入口
✨Spring Bean的生命周期
✨Spring事务原理
✨SpringBoot自动装配原理机制及过程
✨SpringBoot获取处理器流程
✨SpringBoot中处理器映射关系注册流程
✨Spring5.x中Bean初始化流程
✨Spring中Bean定义的注册流程
✨Spring的处理器映射器与适配器的架构设计
✨SpringMVC执行流程图解及源码
目录
💖Spring家族及微服务系列文章
💖前言
💖Nacos服务发现
✨流程图
✨服务发现的入口
💫SpringCloud原生项目spring-cloud-commons
💫Nacos是如何继承下来的?
💫NacosServiceDiscovery#getInstances()获取服务实例
✨NacosNamingService初始化流程
💖NacosNamingService构造初始化
💫ServiceInfoHolder构造初始化
💫NamingClientProxyDelegate构造初始化
💫NamingGrpcClientProxy构造初始化
✨从集成的client模块本地服务发现
💫获取服务实例列表
💖从本地缓存获取
💖NamingClientProxyDelegate组件
💖NamingGrpcClientProxy组件
💫处理订阅
💫请求注册中心
💖processServiceInfo(pushPacket.data)缓存服务到本地
✨RPC调用
💖前言
这篇文章就介绍下,服务发现的入口是什么?本地缓存数据结构、缓存时机、如果缓存中没有如何处理?启动rpcClient、如何判断是否启动、启动失败如何处理?gRPC调用注册中心、调用超时如何处理、调用失败如何处理?重试可以重试多少次、怎么保证、重试机制?CAS成功如何处理、失败呢?带着这些问题,下面我们来探究探究。
注意:Nacos源码版本为2.2.0
💖Nacos服务发现
✨流程图
✨服务发现的入口
💫SpringCloud原生项目spring-cloud-commons
你会发现@EnableDiscoveryClient注解也是在spring-cloud-commons项目,还有个discovery文件夹。我们本节注意下DiscoveryClient接口,以及其中声明的接口方法。SpringCloud是由几个关键项目组成的,spring-cloud-commons项目是其中之一。SpringCloud Alibaba也不是完全替代SpringCloud的,一些基本的规范还是继承下来了,做扩展等。
💫Nacos是如何继承下来的?
Nacos是通过自己的spring-cloud-alibaba-nacos-discovery项目去集成到SpringCloud的以及基于SpringBoot的自动装配机制集成到SpringBoot项目的。而服务发现方面,NacosDiscoveryClient 实现了spring-cloud-commons项目的DiscoveryClient接口,即Nacos中服务发现入口是NacosDiscoveryClient类。
点击方法继续跟进到下面的逻辑
💫NacosServiceDiscovery#getInstances()获取服务实例
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
// 获取配置文件组信息
String group = this.discoveryProperties.getGroup();
// 调用API模块中NamingService的selectInstances()方法,
// 引用是NacosNamingService的反射获取,之前文章已分析
List<Instance> instances = this.namingService().selectInstances(serviceId, group, true);
// 将Nacos的服务实例适配为SpringCloud的ServiceInstance服务实例
return hostToServiceInstanceList(instances, serviceId);
}
主要逻辑:
- 获取配置文件组信息
- 调用API模块中NamingService接口的selectInstances()方法。引用是NacosNamingService的,通过反射获取,之前文章已详细分析。NacosNamingService是Nacos的client模块里面的一个组件,下面分析。
- 将Nacos的服务实例适配为SpringCloud的ServiceInstance服务实例
✨NacosNamingService初始化流程
它的构造方法是在NamingFactory通过反射方式调用的,上面也提到了。因为这个流程也是不小的,故在获取服务实例前先讲解。而它的初始化时间在服务注册阶段完成的,这里补充一下。
💖NacosNamingService构造初始化
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
// 初始化服务持有者
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, properties);
// 初始化client代理
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
调用通知中心将服务实例变更事件注册到发布者以及向事件通知者注册订阅者,构造初始化服务持有者,构造初始化client代理。
💫ServiceInfoHolder构造初始化
public ServiceInfoHolder(String namespace, String notifierEventScope, Properties properties) {
initCacheDir(namespace, properties);
// 初始化本地缓存serviceInfoMap
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
// 初始化故障转移反应堆
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
this.notifierEventScope = notifierEventScope;
}
它主要初始化本地缓存serviceInfoMap以及维护它,初始化故障转移反应堆等。
💫NamingClientProxyDelegate构造初始化
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties,
InstancesChangeNotifier changeNotifier) throws NacosException {
this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,
changeNotifier);
this.serverListManager = new ServerListManager(properties, namespace);
this.serviceInfoHolder = serviceInfoHolder;
this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),
NamingHttpClientManager.getInstance().getNacosRestTemplate());
initSecurityProxy(properties);
this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);
// 构造初始化命名gRPC客户端代理
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,
serviceInfoHolder);
}
构造初始化自己的一些字段,重点是//构造初始化命名gRPC客户端代理
💫NamingGrpcClientProxy构造初始化
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy);
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
// 构造初始化命名客户端gRPC重做服务
this.redoService = new NamingGrpcRedoService(this);
start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
// 启动rpcClient,与注册中心建立连接,涉及到gRPC框架
rpcClient.start();
// 向通知中心注册该订阅者
NotifyCenter.registerSubscriber(this);
}
构造初始化命名客户端gRPC重做服务。启动rpcClient,与注册中心建立长连接,涉及到gRPC框架。向通知中心注册该订阅者。
✨从集成的client模块本地服务发现
本节点讲解的就是客户端服务发现,之所以这样说是因为SpringBoot的自动装配将Nacos的client模块集成进来了,想了解更多去看前面的文章分析。
💫获取服务实例列表
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
String clusterString = StringUtils.join(clusters, ",");
// 根据上面重载调用可知默认采用订阅方式
if (subscribe) {
// 从本地serviceInfoMap获取
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
// 本地没有,通过代理订阅
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
// 如果不是订阅方式则发送http查询获取
serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
}
// 负载均衡选取健康服务
return selectInstances(serviceInfo, healthy);
}
主要逻辑:
- 调用重载方法,默认选择健康服务、使用订阅模式
- 从本地缓存serviceInfoMap获取服务信息
- 本地没有,通过代理订阅,即调用注册中心;如果不是订阅则http调用注册中心
- 负载均衡选取健康服务
💖从本地缓存获取
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
String key = ServiceInfo.getKey(groupedServiceName, clusters);
// 如果配置了failoverSwitch为true
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
// 否则serviceInfoMap获取
return serviceInfoMap.get(key);
}
failoverSwitch默认为false,从本地缓存serviceInfoMap获取服务信息
💖NamingClientProxyDelegate组件
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
// 根据配置namingAsyncQuerySubscribeService决定是否调度更新
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
// 尝试从本地获取
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
// 本地没有还是,则gRPC远程调用注册中心获取
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
// 缓存到本地(提升系统性能)
serviceInfoHolder.processServiceInfo(result);
return result;
}
主要逻辑:
- 尝试从本地缓存获取
- 本地没有还是,则gRPC远程调用注册中心获取
- 缓存到本地(提升系统性能)
💖NamingGrpcClientProxy组件
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
}
// 缓存起来
redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
// 真正处理订阅
return doSubscribe(serviceName, groupName, clusters);
}
缓存服务名、分组、集群预备重做,调用下面方法处理订阅
💫处理订阅
public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
// 封装订阅服务请求数据
SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,
true);
// 请求注册中心
SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
redoService.subscriberRegistered(serviceName, groupName, clusters);
// 返回响应数据以便缓存到本地
return response.getServiceInfo();
}
主要逻辑:
- 封装订阅服务请求数据
- 请求注册中心
- 订阅已完成,返回响应数据以便缓存到本地
💫请求注册中心
// 请求服务器
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// RPC调用,requestTimeout请求超时时间
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
// 响应不是成功状态码
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (NacosException e) {
throw e;
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
主要逻辑:
- RPC调用,requestTimeout请求超时时间。下面RPC调用讲解
- 如果响应不是成功状态码,则抛异常
- 强制类型转换,返回响应数据以便缓存到本地
💖processServiceInfo(pushPacket.data)缓存服务到本地
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
// 缓存到本地serviceInfoMap
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 判断服务是否发送变更
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
// 通知中心发布事件
NotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
主要逻辑:
- serviceKey为空,结束缓存
- 空的或错误推送,不缓存
- 缓存到本地serviceInfoMap
- 判断服务是否发送变更,如果变更,调用通知中心发布服务实例变更事件
✨RPC调用
/**
* send request.这里的RPC调用具备重试功能
*
* @param request request.
* @return response from server.
*/
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Exception exceptionThrow = null;
long start = System.currentTimeMillis();
// 为什么会重试呢?发送失败重试,同时因为提供服务的,客户做生意哦,服务得可用
// RETRY_TIMES默认3,DEFAULT_TIMEOUT_MILLS默认3000L
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
// 这里可能抛超时异常
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 连接未注册,异步切换服务器,
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.返回结果结束运行
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Exception e) {
// 上面抛的异常这里捕捉了,所以不会影响下面的代码运行
if (waitReconnect) {
try {
// wait client to reconnect.等待客户端重新连接。
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
exceptionThrow = e;
}
// 重试次数递增,小于3继续请求
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
// 则异步启动
switchServerAsyncOnRequestFail();
}
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
主要逻辑:
- 一个while循环,边界:RETRY_TIMES默认3次,DEFAULT_TIMEOUT_MILLS默认3000L即3秒
- 如果rpcClient没有连接到注册中心,抛异常
- 在当前连接处理请求,这里可能抛超时异常。底层基于gRPC框架、Future模式等。
- 如果响应结果空,抛异常
- 连接未注册,异步切换服务器。抛异常
- 正常响应,返回结果结束该方法运行
- 上面抛的异常这里catch捕捉了,所以不会影响下面的代码运行。等待客户端重新连接。
- 重试次数递增,小于3继续请求
- 如果重试3次了还是不行:1)CAS成功异步切换服务器。2)抛异常
这个是新的版本,有不少处良好的设计值得学习。