大纲
7.服务端对服务实例进行健康检查
8.服务下线如何注销注册表和客户端等信息
9.事件驱动架构源码分析
一.处理ClientChangedEvent事件
也就是同步数据到集群节点:
public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
...
@Override
public void onEvent(Event event) {
...
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
syncToAllServer((ClientEvent) event);
}
}
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
//Only ephemeral data sync by Distro, persist client should sync by raft.
//临时实例使用Distro协议,持久化实例使用Raft协议
//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
//如果event是客户端注销实例时需要进行集群节点同步的事件
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
//如果event是客户端注册实例时需要进行集群节点同步的事件
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
...
}
@Component
public class DistroProtocol {
private final ServerMemberManager memberManager;
private final DistroTaskEngineHolder distroTaskEngineHolder;
...
//Start to sync by configured delay.
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
//Start to sync data to all remote server.
public void sync(DistroKey distroKey, DataOperation action, long delay) {
//遍历集群中除自身节点外的其他节点
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
//Start to sync to target server.
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
//先把要同步的集群节点targetServer包装成DistroKey对象
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);
//然后根据DistroKey对象创建DistroDelayTask任务
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
//接着调用NacosDelayTaskExecuteEngine.addTask()方法
//往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTask
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
...
}
二.处理ClientDeregisterServiceEvent事件
也就是移除注册表 + 订阅表的服务实例:
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
...
@Override
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
//处理客户端注册事件ClientRegisterServiceEvent
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
//处理客户端注销事件ClientDeregisterServiceEvent
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//处理客户端订阅服务事件ClientSubscribeServiceEvent
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
//处理客户端取消订阅事件ClientUnsubscribeServiceEvent
removeSubscriberIndexes(service, clientId);
}
}
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
//移除注册表中的服务实例
publisherIndexes.get(service).remove(clientId);
//发布服务改变事件ServiceChangedEvent
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
...
}
三.处理ServiceChangeEvent事件
也就是通知订阅了该服务的客户端:
@org.springframework.stereotype.Service
public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {
...
@Override
public void onEvent(Event event) {
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ServiceEvent.ServiceChangedEvent) {
//If service changed, push to all subscribers.
//如果服务变动,会向Service服务的所有订阅者推送Service服务的实例信息,让订阅者(客户端)更新本地缓存
ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;
Service service = serviceChangedEvent.getService();
//调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));
} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {
//If service is subscribed by one client, only push this client.
//如果Service服务被一个客户端订阅,则只推送Service服务的实例信息给该客户端
ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;
Service service = subscribedEvent.getService();
//调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务
delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));
}
}
...
}
(3)总结
9.事件驱动架构源码分析
(1)如何使用Nacos的事件发布
(2)Nacos通知中心的事件发布源码
(3)Nacos通知中心注册订阅者的源码
Nacos 2.x大量使用了事件发布的动作,比如客户端注册服务实例、客户端下线服务实例、服务改变、服务订阅等。
(1)如何使用Nacos的事件发布
一.首先自定义一个事件
下面定义了一个名为TestEvent的事件,继承自Nacos的Event类。
import com.alibaba.nacos.common.notify.Event;
public class TestEvent extends Event {
}
二.然后定义一个订阅者
有了事件之后,还需要一个订阅者,这样发布的事件才能被这个订阅者进行处理。
自定义的订阅者需要继承Nacos的SmartSubscriber抽象类,自定义的订阅者需要实现三个方法。
方法一:构造方法
需要将自定义的订阅者注册到Nacos的通知中心NotifyCenter里,这样NotifyCenter在发布自定义事件时,才能让自定义的订阅者进行响应。
方法二:subscribeTypes()方法
实现该方法时,需要把自定义的事件添加到方法的返回结果中,所以可以通过该方法获取自定义订阅者监听了哪些事件。
方法三:onEvent()方法
Nacos的通知中心NotifyCenter在发布自定义事件时,便会调用该方法,所以该方法中需要实现自定义订阅者对自定义事件的处理。
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.List;
//自定义的订阅者需要继承Nacos的SmartSubscriber抽象类
@Component
public class TestSubscriber extends SmartSubscriber {
//构造方法中需要将自定义的订阅者TestSubscriber注册到Nacos的通知中心NotifyCenter
public TestSubscriber() {
NotifyCenter.registerSubscriber(this);
}
//实现subscribeTypes()方法时,把自定义的事件TestEvent添加进去返回
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(TestEvent.class);
return result;
}
//实现onEvent()方法
//当Nacos的通知中心NotifyCenter发布一个TestEvent事件时,就会响应该方法处理订阅者的逻辑
@Override
public void onEvent(Event event) {
System.out.println("TestSubscriber onEvent");
}
}
三.最后通过Nacos的通知中心NotifyCenter发布自定义事件
这样便完成了自定义事件、自定义订阅者通过Nacos实现发布订阅功能。
@RestController
@RequestMapping("/sub/")
public class SubscriberController {
@GetMapping("/test")
public void test() {
NotifyCenter.publishEvent(new TestEvent());
}
}
(2)Nacos通知中心的事件发布源码
通知中心NotifyCenter执行publishEvent()方法发布事件时,比如会调用DefaultPublisher的publish()方法来发布事件。
DefaultPublisher的publish()方法会先把事件放入到一个阻塞队列queue中,而在DefaultPublisher创建时会启动一个线程从阻塞队列取出事件来处理。处理时就会调用到DefaultPublisher的receiveEvent()方法通知事件订阅者,也就是执行DefaultPublisher的notifySubscriber()方法通知事件订阅者。
在DefaultPublisher的notifySubscriber()方法中,首先会创建一个调用订阅者的onEvent()方法的任务,然后如果订阅者有线程池,则将任务提交给订阅者的线程池去执行。如果订阅者没有线程池,则直接执行该任务。
可见事件的发布也使用了阻塞队列 + 异步任务,来实现对订阅者的通知。
public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
//key是事件Class的canonicalName,value是EventPublisher对象,一个事件对应一个EventPublisher对象
//在EventPublisher对象中就会包含订阅了该事件的所有订阅者
//EventPublisher的实现类有DefaultPublisher、NamingEventPublisher
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
...
//Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published.
public static boolean publishEvent(final Event event) {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
LOGGER.error("There was an exception to the message publishing : ", ex);
return false;
}
}
//Request publisher publish event Publishers load lazily, calling publisher.
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
//获取发布的事件的Class的canonicalName
final String topic = ClassUtils.getCanonicalName(eventType);
//根据发布事件类型获取EventPublisher对象,该对象中会包含所发布事件的所有订阅者信息
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
//比如调用DefaultPublisher.publish()方法发布事件
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
...
}
//The default event publisher implementation.
//一个事件只会对应一个DefaultPublisher
public class DefaultPublisher extends Thread implements EventPublisher {
private Class<? extends Event> eventType;
//阻塞队列存放待发布的事件
private BlockingQueue<Event> queue;
//Class为eventType的事件的所有订阅者
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();
@Override
public void init(Class<? extends Event> type, int bufferSize) {
...
start();
}
@Override
public synchronized void start() {
if (!initialized) {
super.start();
...
}
}
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
...
for (; ;) {
...
//从阻塞队列取数据
final Event event = queue.take();
//处理事件
receiveEvent(event);
...
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
...
@Override
public boolean publish(Event event) {
checkIsStart();
//把事件放入到了一个阻塞队列queue中,由DefaultPublisher创建时启动的线程来处理
boolean success = this.queue.offer(event);
if (!success) {//如果事件放入阻塞队列失败,则直接处理
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
//通知事件的订阅者去进行事件处理
receiveEvent(event);
return true;
}
return true;
}
//通知事件的订阅者去进行事件处理
void receiveEvent(Event event) {
...
//遍历当前事件的订阅者,对订阅者执行notifySubscriber()方法,实际上就是执行订阅者的onEvent()方法
for (Subscriber subscriber : subscribers) {
...
//触发执行订阅者的onEvent()方法,实现对订阅者的通知
notifySubscriber(subscriber, event);
}
}
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
//创建一个任务,该任务会调用订阅者的onEvent方法
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
//将任务提交给订阅者的线程池去执行
executor.execute(job);
} else {
try {
//如果订阅者没有线程池,则直接执行该任务
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
...
}
(3)Nacos通知中心注册订阅者的源码
在执行NotifyCenter的registerSubscriber()方法注册订阅者时,会调用订阅者实现的subscribeTypes()方法获取订阅者要监听的所有事件,然后遍历这些事件并调用NotifyCenter的addSubscriber()方法。
执行NotifyCenter的addSubscriber()方法时会为这些事件添加订阅者。由于每个事件都会对应一个EventPublisher对象,所以会先从NotifyCenter.publisherMap中获取EventPublisher对象,然后调用EventPublisher的addSubscriber()方法向EventPublisher添加订阅者,从而完成向通知中心注册订阅者。
public class NotifyCenter {
private static final NotifyCenter INSTANCE = new NotifyCenter();
//key是事件Class的canonicalName,value是EventPublisher对象,一个事件对应一个EventPublisher对象
//在EventPublisher对象中就会包含订阅了该事件的所有订阅者
//EventPublisher的实现类有DefaultPublisher、NamingEventPublisher
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
...
public static void registerSubscriber(final Subscriber consumer) {
//注册订阅者
registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);
}
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
if (consumer instanceof SmartSubscriber) {
//调用subscribeTypes()方法获取订阅者consumer需要监听的事件,然后对这些事件进行遍历
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
//For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
//添加订阅者
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
//For case, producer: defaultPublisher -> consumer: subscriber.
//添加订阅者
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
addSubscriber(consumer, subscribeType, factory);
}
//Add a subscriber to publisher.
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType, EventPublisherFactory factory) {
//获取订阅的事件的Class的canonicalName
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
//MapUtils.computeIfAbsent is a unsafe method.
//创建EventPublisher对象,一个事件会对应一个EventPublisher对象
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
//获取事件对应的EventPublisher对象,比如DefaultPublisher对象
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
//往EventPublisher对象添加订阅者信息,比如调用DefaultPublisher.addSubscriber()方法
publisher.addSubscriber(consumer);
}
}
...
}
//一个事件只会对应一个DefaultPublisher
public class DefaultPublisher extends Thread implements EventPublisher {
private Class<? extends Event> eventType;
//Class为eventType的事件的所有订阅者
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();
...
@Override
public void addSubscriber(Subscriber subscriber) {
//添加订阅者
subscribers.add(subscriber);
}
...
}