💖 Spring家族及微服务系列文章
✨【微服务】SpringCloud微服务续约源码解析
✨【微服务】SpringCloud微服务注册源码解析
✨【微服务】Nacos2.x服务发现?RPC调用?重试机制?
✨【微服务】Nacos通知客户端服务变更以及重试机制
✨【微服务】Nacos服务发现源码分析
✨【微服务】SpringBoot监听器机制以及在Nacos中的应用
✨【微服务】Nacos服务端完成微服务注册以及健康检查流程
✨【微服务】Nacos客户端微服务注册原理流程
✨【微服务】SpringCloud中使用Ribbon实现负载均衡的原理
✨【微服务】SpringBoot启动流程注册FeignClient
✨【微服务】SpringBoot启动流程初始化OpenFeign的入口
✨Spring Bean的生命周期
✨Spring事务原理
✨SpringBoot自动装配原理机制及过程
✨SpringBoot获取处理器流程
✨SpringBoot中处理器映射关系注册流程
✨Spring5.x中Bean初始化流程
✨Spring中Bean定义的注册流程
✨Spring的处理器映射器与适配器的架构设计
✨SpringMVC执行流程图解及源码
目录
💖 Spring家族及微服务系列文章
一、前言
二、微服务剔除下线源码解析
1、EurekaBootStrap#contextInitialized()
1.1、初始化注册中心上下文
1.2、openForTraffic()逻辑
1.3、postInit()执行任务
1.4、剔除任务
2、服务剔除下线
2.1、AbstractInstanceRegistry#evict()逻辑
2.1、判断是否过期
2.2、从本地列表异常下线处理
一、前言
上一篇SpringCloud微服务续约源码解析已经分析了心跳机制是什么、底层实现、客户端发送心跳的主要代码、注册中心处理心跳的过程,这节跟它是紧密关联的。联系的枢纽就是lastUpdateTimestamp最后更新时间戳,它是Lease租约类的一个用volatile关键字修饰的对其他线程透明可见的字段。那么Eureka是如何使用该字段判断服务是否过期的?然后进行服务的剔除下线?需要借助什么机制?该机制是什么时候能触发的?带着这些问题,我们下面来探究一番:
二、微服务剔除下线源码解析
EurekaBootStrap是Eureka项目里面的,用于启动Eureka服务器的类:
Eureka 服务器使用类路径中eureka.server.props指定的EurekaServerConfig进行配置。Eureka客户端组件也是通过使用eureka.client.props指定的配置 EurekaInstanceConfig初始化的。如果服务器在AWS云中运行,则eureka服务器将其绑定到指定的弹性ip。
1、EurekaBootStrap#contextInitialized()
@Override
public void contextInitialized(ServletContextEvent event) {
try {
initEurekaEnvironment();
// 初始化注册中心上下文
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
它这里也使用了事件机制,但是不是基于Spring的,感兴趣的可以去了解下。初始化注册中心上下文,即下面的处理逻辑:
1.1、初始化注册中心上下文
protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
ApplicationInfoManager applicationInfoManager = null;
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
serverContext.initialize();
logger.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
做一些初始化工作,重点关注registry.openForTraffic(applicationInfoManager, registryCount);的调用,进入下面处理逻辑:
1.2、openForTraffic()逻辑
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 更新每30秒发生一次,一分钟应该是2倍。
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
// 更改服务实例状态为UP
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 调用父类初始化
super.postInit();
}
更改服务实例状态为UP,调用父类初始化。
1.3、postInit()执行任务
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
终于来到剔除任务了,前面说了什么,就是一些初始化的工作。它这里的执行器是Timer,跟Nacos不一样,区别的话感兴趣的就自行去搞个明白。我们进入下面的分析:
1.4、剔除任务
EvictionTask是TimerTask类型任务。
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
/**
* 计算一个补偿时间,该时间定义为自上一次迭代以来该任务的实际执行时间,与配置的执行时间量相比较。
* 这对于时间变化(例如由于时钟偏差或 gc)导致实际的驱逐任务根据配置的周期在所需时间之后执行的情况
* 非常有用。
*/
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
long getCurrentTimeNano() { // for testing
return System.nanoTime();
}
}
主要逻辑:
- 计算一个补偿时间,该时间定义为自上一次迭代以来该任务的实际执行时间,与配置的执行时间量相比较。这对于时间变化(例如由于时钟偏差或 gc)导致实际的驱逐任务根据配置的周期在所需时间之后执行的情况 非常有用。
- 调用evict(compensationTimeMs)剔除处理,下面分析:
2、服务剔除下线
2.1、AbstractInstanceRegistry#evict()逻辑
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
// DS: 租约到期目前已禁用。
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// 我们首先收集所有过期的物品,以随机的顺序驱逐它们。对于大型驱逐集,如果我们不这样做,
// 我们可能会在自我保护启动之前删除整个应用程序。通过随机化,影响应该均匀地分布在所有应用程序中。
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 判断租约是否过期
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
// 添加到过期续租集合
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 为了补偿 GC 暂停或漂移的本地时间,我们需要使用当前的注册表大小作为触发自我保存的基础。
// 没有这个,我们就会清除整个注册表。
// 获取注册表租约总数
int registrySize = (int) getLocalRegistrySize();
// 计算注册表租约的阈值 (总数乘以 续租百分比 默认85%),得出要续租的数量
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
// 理论要剔除的数量 = 总数-要续租的数量
int evictionLimit = registrySize - registrySizeThreshold;
// 实际剔除的数量 = min(实际租期到期服务实例个数,理论剔除数量)
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
// 将要剔除数量大于0,把它们下线处理,从本地注册表移除掉以保证高可用
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// 选择一个随机的项目(Knuth 洗牌算法)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
// 注册表: {}/{}的租约已过期
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// 服务下线
internalCancel(appName, id, false);
}
}
}
主要逻辑:
- 判断租约到期是否禁用,如果禁用return。默认启用
- 首先收集所有过期的租约,以随机的顺序剔除它们。对于大型剔除集,如果不这样做,可能会在自我保护启动之前删除整个应用程序。通过随机化,影响应该均匀地分布在所有应用程序中。判断租约是否过期,如果过期添加到过期租约集合,继续遍历到。
- 为了补偿 GC 暂停或漂移的本地时间,需要使用当前的注册表大小作为触发自我保存的基础。没有这个,就会清除整个注册表。1)获取注册表租约总数;2)计算注册表租约的阈值 (总数乘以 续租百分比 默认85%),得出要续租的数量;3)理论要剔除的数量 = 总数-要续租的数量;4)实际剔除的数量 = min(实际租期到期服务实例个数,理论剔除数量);
- 将要剔除数量大于0,把它们下线处理,从本地注册表移除掉以保证高可用:选择一个随机的项目(Knuth 洗牌算法),调用internalCancel(appName, id, false)下线处理。
2.1、判断是否过期
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
如果是cancel()处理前面的值就大于0,一般是判断后面部分逻辑:如果当前系统时间戳小于后面的时间戳之和,则没有过期;否则大于就是过期了。
2.2、从本地列表异常下线处理
cancel(String,String,boolean)方法被PeerAwareInstanceRegistry重写了,因此每个取消请求都被复制到对等点。然而,对于在远程对等点中被视为有效取消的过期,这是不需要的,因此自我保存模式不会启用。
protected boolean internalCancel(String appName, String id, boolean isReplication) {
// 加锁
read.lock();
try {
CANCEL.increment(isReplication);
// 根据appName从本地注册表获取租约服务实例
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
// 根据唯一ID从本地移除服务实例,下线
leaseToCancel = gMap.remove(id);
}
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
// 下线失败,因为租约信息中不存在该服务实例
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
// 通过更新剔除时间取消租约。
leaseToCancel.cancel();
// 从租约获取服务实例
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 使特定应用程序的缓存失效
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
// 释放锁
read.unlock();
}
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
主要逻辑:
- 获取锁后,根据appName从本地注册表获取租约服务实例
- 根据唯一ID从本地移除服务实例,下线
- 如果需下线租约信息为空,则下线失败,因为租约信息中不存在该服务实例,return假;否则可能通过更新剔除时间取消租约,从租约获取服务实例以便使特定应用程序的缓存失效
- 释放锁