Nacos源码—7.Nacos升级gRPC分析四

news2025/7/18 15:19:21

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

6.微服务实例信息如何同步集群节点

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

(2)ClientChangedEvent事件的处理源码

(3)集群节点处理数据同步请求的源码

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

ClientChangedEvent事件的作用就是向集群节点同步服务实例数据的。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }
    
    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名
        Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                //注册实例
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                //注销实例
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));
        }
    }
    
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;
        //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名
        //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息
        //参数meta.getConnectionId():这个参数很关键,它是连接ID
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    ...
}

@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    private final ClientManager clientManager;
    
    public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
        this.clientManager = clientManager;
    }
    
    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));
        }
        //从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //将请求中的instance实例信息封装为InstancePublishInfo对象
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法
        client.addServiceInstance(singleton, instanceInfo);
        //设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间
        client.setLastUpdatedTime();
        //发布客户端注册服务实例的事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        //发布服务实例元数据的事件
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    ...
}

//Nacos naming client based ip and port.
//The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
public class IpPortBasedClient extends AbstractClient {
    ...
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
    }
    ...
}

//Abstract implementation of {@code Client}.
public abstract class AbstractClient implements Client {
    //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务
    //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象
    //key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象
    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    //subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务
    //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
    protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    ...
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        //服务注册时,如果是第一次put进去Service对象,会返回null
        if (null == publishers.put(service, instancePublishInfo)) {
            //监视器记录
            MetricsMonitor.incrementInstanceCount();
        }
        //发布客户端改变事件,用于处理集群间的数据同步
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
    ...
}

(2)ClientChangedEvent事件的处理源码

DistroClientDataProcessor的onEvent()方法会响应ClientChangedEvent。该方法如果判断出事件类型为ClientChangedEvent事件,那么就会执行DistroClientDataProcessor的syncToAllServer()方法,然后调用DistroProtocol的sync()方法进行集群节点同步处理。

DistroProtocol的sync()方法会遍历集群中除自身节点外的其他节点,然后对遍历到的每个节点执行DistroProtocol的syncToTarget()方法。

在DistroProtocol的syncToTarget()方法中,首先把要同步的集群节点targetServer包装成DistroKey对象,然后根据DistroKey对象创建DistroDelayTask延迟任务,接着调用NacosDelayTaskExecuteEngine的addTask()方法,往延迟任务执行引擎的tasks中添加任务。

NacosDelayTaskExecuteEngine在初始化时会启动一个定时任务,这个定时任务会定时执行ProcessRunnable的run()方法。而ProcessRunnable的run()方法会不断从任务池tasks中取出延迟任务处理,处理DistroDelayTask任务时会调用DistroDelayTaskProcessor的process()方法。

在执行DistroDelayTaskProcessor的process()方法时,会先根据DistroDelayTask任务封装一个DistroSyncChangeTask任务,然后调用NacosExecuteTaskExecuteEngine的addTask()方法。也就是调用TaskExecuteWorker的process()方法,将DistroSyncChangeTask任务添加到TaskExecuteWorker的阻塞队列中,同时创建TaskExecuteWorker时会启动线程不断从队列中取出任务处理。因此最终会执行DistroSyncChangeTask的run()方法。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    private final ClientManager clientManager;
    private final DistroProtocol distroProtocol;
    ...
    @Override
    public void onEvent(Event event) {
        ...
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }
    
    private void syncToAllServer(ClientEvent event) {
        Client client = event.getClient();
        //Only ephemeral data sync by Distro, persist client should sync by raft.
        //临时实例使用Distro协议,持久化实例使用Raft协议
        //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            //如果event是客户端注销实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
            //如果event是客户端注册实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }
    ...
}

@Component
public class DistroProtocol {
    private final ServerMemberManager memberManager;
    private final DistroTaskEngineHolder distroTaskEngineHolder;
    ...
    //Start to sync by configured delay.
    public void sync(DistroKey distroKey, DataOperation action) {
        sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
    }
    
    //Start to sync data to all remote server.
    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        //遍历集群中除自身节点外的其他节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            syncToTarget(distroKey, action, each.getAddress(), delay);
        }
    }
    
    //Start to sync to target server.
    public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
        //先把要同步的集群节点targetServer包装成DistroKey对象
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);
        //然后根据DistroKey对象创建DistroDelayTask任务
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        //接着调用NacosDelayTaskExecuteEngine.addTask()方法
        //往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTask
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
        }
    }
    ...
}

//延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
    private final ScheduledExecutorService processingExecutor;
    protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池
   
    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        tasks = new ConcurrentHashMap<>(initCapacity);
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        //开启定时任务,即启动ProcessRunnable线程任务
        processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }
    ...
    @Override
    public void addTask(Object key, AbstractDelayTask newTask) {
        lock.lock();
        try {
            AbstractDelayTask existTask = tasks.get(key);
            if (null != existTask) {
                newTask.merge(existTask);
            }
            //最后放入到任务池中
            tasks.put(key, newTask);
        } finally {
            lock.unlock();
        }
    }
    
    protected void processTasks() {
        //获取tasks中所有的任务,然后进行遍历
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            //通过任务key,获取具体的任务,并且从任务池中移除掉
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            //通过任务key获取对应的NacosTaskProcessor延迟任务处理器
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                //ReAdd task if process failed
                //调用获取到的NacosTaskProcessor延迟任务处理器的process()方法
                if (!processor.process(task)) {
                    //如果失败了,会重试添加task回tasks这个map中
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error ", e);
                retryFailedTask(taskKey, task);
            }
        }
    }
    ...
    private class ProcessRunnable implements Runnable {
        @Override
        public void run() {
            try {
                processTasks();
            } catch (Throwable e) {
                getEngineLog().error(e.toString(), e);
            }
        }
    }
}

//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
    ...
    @Override
    public boolean process(NacosTask task) {
        if (!(task instanceof DistroDelayTask)) {
            return true;
        }
        DistroDelayTask distroDelayTask = (DistroDelayTask) task;
        DistroKey distroKey = distroDelayTask.getDistroKey();
        switch (distroDelayTask.getAction()) {
            case DELETE:
                //处理客户端注销实例时的延迟任务(同步数据到集群节点)
                //根据DistroDelayTask任务封装一个DistroSyncTask任务
                DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                //调用NacosExecuteTaskExecuteEngine.addTask()方法
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                return true;
            case CHANGE:
            case ADD:
                //处理客户端注册实例时的延迟任务(同步数据到集群节点)
                //根据DistroDelayTask任务封装一个DistroSyncChangeTask任务
                DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                //调用NacosExecuteTaskExecuteEngine.addTask()方法
                distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                return true;
            default:
                return false;
        }
    }
}

//任务执行引擎
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
    private final TaskExecuteWorker[] executeWorkers;
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
        super(logger);
        executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
        for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
            executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
        }
    }
    ...
    @Override
    public void addTask(Object tag, AbstractExecuteTask task) {
        //根据tag获取到TaskExecuteWorker
        NacosTaskProcessor processor = getProcessor(tag);
        if (null != processor) {
            processor.process(task);
            return;
        }
        TaskExecuteWorker worker = getWorker(tag);
        //调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去
        worker.process(task);
    }
    
    private TaskExecuteWorker getWorker(Object tag) {
        int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
        return executeWorkers[idx];
    }    
    ...
}

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
    private final BlockingQueue<Runnable> queue;//任务存储容器
    
    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        ...
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        new InnerWorker(name).start();
    }
    
    @Override
    public boolean process(NacosTask task) {
        if (task instanceof AbstractExecuteTask) {
            //把NacosTask任务放入到阻塞队列中
            putTask((Runnable) task);
        }
        return true;
    }
    
    private void putTask(Runnable task) {
        try {
            //把NacosTask任务放入到阻塞队列中
            queue.put(task);
        } catch (InterruptedException ire) {
            log.error(ire.toString(), ire);
        }
    }
    ...
    private class InnerWorker extends Thread {
        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }
        
        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    //一直取阻塞队列中的任务
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    //调用NacosTask中的run方法
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e.toString(), e);
                }
            }
        }
    }
}

执行DistroSyncChangeTask的run()方法,其实就是执行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法会先获取请求数据,然后调用DistroClientTransportAgent的syncData()方法同步集群节点,也就是调用ClusterRpcClientProxy的sendRequest()方法发送数据同步请求,最终会调用RpcClient的request()方法 -> GrpcConnection的request()方法。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
    ...
    @Override
    protected void doExecuteWithCallback(DistroCallback callback) {
        String type = getDistroKey().getResourceType();
        //获取请求数据
        DistroData distroData = getDistroData(type);
        if (null == distroData) {
            Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
            return;
        }
        //默认调用DistroClientTransportAgent.syncData()方法同步集群节点
        getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
    }
    
    private DistroData getDistroData(String type) {
        DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
        if (null != result) {
            result.setType(OPERATION);
        }
        return result;
    }
    ...
}

public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {
    ...
    @Override
    public void run() {
        //Nacos:Naming:v2:ClientData
        String type = getDistroKey().getResourceType();
        //获取DistroClientTransportAgent对象
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) {
            Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
            return;
        }
        Loggers.DISTRO.info("[DISTRO-START] {}", toString());
        //默认返回true
        if (transportAgent.supportCallbackTransport()) {
            //默认执行子类的doExecuteWithCallback()方法
            doExecuteWithCallback(new DistroExecuteCallback());
        } else {
            executeDistroTask();
        }
    }
    
    protected abstract void doExecuteWithCallback(DistroCallback callback);
    ...
}

public class DistroClientTransportAgent implements DistroTransportAgent {
    private final ClusterRpcClientProxy clusterRpcClientProxy;
    private final ServerMemberManager memberManager;
    ...
    @Override
    public boolean syncData(DistroData data, String targetServer) {
        if (isNoExistTarget(targetServer)) {
            return true;
        }
        //创建请求对象
        DistroDataRequest request = new DistroDataRequest(data, data.getType());
        //找到集群节点
        Member member = memberManager.find(targetServer);
        if (checkTargetServerStatusUnhealthy(member)) {
            Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
            return false;
        }
        try {
            //向集群节点发送RPC异步请求
            Response response = clusterRpcClientProxy.sendRequest(member, request);
            return checkResponse(response);
        } catch (NacosException e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
        }
        return false;
    }
    ...
}

@Service
public class ClusterRpcClientProxy extends MemberChangeListener {
    ...
    //send request to member.
    public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
        RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
        if (client != null) {
            //调用RpcClient.request()方法
            return client.request(request, timeoutMills);
        } else {
            throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
        }
    }
    ...
}

public abstract class RpcClient implements Closeable {
    //在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,
    //会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性
    protected volatile Connection currentConnection;
    ...
    //send request.
    public Response request(Request request, long timeoutMills) throws NacosException {
        int retryTimes = 0;
        Response response;
        Exception exceptionThrow = null;
        long start = System.currentTimeMillis();
        while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
            ...
            //发起gRPC请求,调用GrpcConnection.request()方法
            response = this.currentConnection.request(request, timeoutMills);
            ...
        }
        ...
    }
    ...
}

(3)集群节点处理数据同步请求的源码

通过DistroClientTransportAgent的syncData()方法发送的数据同步请求,会被DistroDataRequestHandler的handle()方法处理。然后会调用DistroDataRequestHandler的handleSyncData()方法,接着调用DistroProtocol的onReceive()方法,于是最终会调用到DistroClientDataProcessor.processData()方法。

在执行DistroClientDataProcessor的processData()方法时,如果是同步服务实例新增、修改后的数据,则执行DistroClientDataProcessor的handlerClientSyncData()方法。该方法会和处理服务注册时一样,发布一个客户端注册服务实例的事件。如果是同步服务实例删除后的数据,则调用EphemeralIpPortClientManager的clientDisconnected()方法。首先移除客户端对象信息,然后发布一个客户端注销服务实例的事件。

其中客户端注销服务实例的事件ClientDisconnectEvent,首先会被ClientServiceIndexesManager的onEvent()方法进行处理,处理时会调用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager订阅者列表的元素和注册表的元素。然后会被DistroClientDataProcessor的onEvent()方法进行处理,进行集群节点之间的数据同步。

@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {
    private final DistroProtocol distroProtocol;
    ...
    @Override
    public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
        try {
            switch (request.getDataOperation()) {
                case VERIFY:
                    return handleVerify(request.getDistroData(), meta);
                case SNAPSHOT:
                    return handleSnapshot();
                case ADD:
                case CHANGE:
                case DELETE:
                    //服务实例新增、修改、删除的同步,都会由DistroDataRequestHandler.handleSyncData()方法处理
                    return handleSyncData(request.getDistroData());
                case QUERY:
                    return handleQueryData(request.getDistroData());
                default:
                    return new DistroDataResponse();
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
            DistroDataResponse result = new DistroDataResponse();
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("handle distro request with exception");
            return result;
        }
    }
    
    private DistroDataResponse handleSyncData(DistroData distroData) {
        DistroDataResponse result = new DistroDataResponse();
        //调用DistroProtocol.onReceive()方法
        if (!distroProtocol.onReceive(distroData)) {
            result.setErrorCode(ResponseCode.FAIL.getCode());
            result.setMessage("[DISTRO-FAILED] distro data handle failed");
        }
        return result;
    }
    ...
}

@Component
public class DistroProtocol {
    ...
    //Receive synced distro data, find processor to process.
    public boolean onReceive(DistroData distroData) {
        Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
        //Nacos:Naming:v2:ClientData
        String resourceType = distroData.getDistroKey().getResourceType();
        //获取DistroClientDataProcessor处理对象
        DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
        if (null == dataProcessor) {
            Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
            return false;
        }
        //调用DistroClientDataProcessor.processData()方法
        return dataProcessor.processData(distroData);
    }
    ...
}

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    ...
    @Override
    public boolean processData(DistroData distroData) {
        switch (distroData.getType()) {
            case ADD:
            case CHANGE:
                //服务实例添加和改变时的执行逻辑
                ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);
                handlerClientSyncData(clientSyncData);
                return true;
            case DELETE:
                //服务实例删除时的执行逻辑
                String deleteClientId = distroData.getDistroKey().getResourceKey();
                Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                //调用EphemeralIpPortClientManager.clientDisconnected()方法
                clientManager.clientDisconnected(deleteClientId);
                return true;
            default:
                return false;
        }
    }
    
    private void handlerClientSyncData(ClientSyncData clientSyncData) {
        Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
        clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
        Client client = clientManager.getClient(clientSyncData.getClientId());
        upgradeClient(client, clientSyncData);
    }
    
    private void upgradeClient(Client client, ClientSyncData clientSyncData) {
        List<String> namespaces = clientSyncData.getNamespaces();
        List<String> groupNames = clientSyncData.getGroupNames();
        List<String> serviceNames = clientSyncData.getServiceNames();
        List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
        Set<Service> syncedService = new HashSet<>();
        
        for (int i = 0; i < namespaces.size(); i++) {
            Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
            Service singleton = ServiceManager.getInstance().getSingleton(service);
            syncedService.add(singleton);
            InstancePublishInfo instancePublishInfo = instances.get(i);
            //如果和当前不一样才发布事件
            if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
                client.addServiceInstance(singleton, instancePublishInfo);
                //发布客户端注册服务实例的事件,与客户端进行服务注册时一样
                NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
            }
        }
        
        for (Service each : client.getAllPublishedService()) {
            if (!syncedService.contains(each)) {
                client.removeServiceInstance(each);
                NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
            }
        }
    }
    ...
}

@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
    //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
    private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    ...
    @Override
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        //移除客户端信息
        IpPortBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        //发布客户端注销服务实例的事件
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
        client.release();
        return true;
    }
    ...
}

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {
    //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    ...
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
    
    private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
        Client client = event.getClient();
        for (Service each : client.getAllSubscribeService()) {
            //移除订阅者列表的元素
            removeSubscriberIndexes(each, client.getClientId());
        }
        for (Service each : client.getAllPublishedService()) {
            //移除注册表的元素
            removePublisherIndexes(each, client.getClientId());
        }
    }
    ...
}

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
    ...
    @Override
    public void onEvent(Event event) {
        ...
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
        } else {
            syncToAllServer((ClientEvent) event);
        }
    }
    
    private void syncToAllServer(ClientEvent event) {
        Client client = event.getClient();
        //Only ephemeral data sync by Distro, persist client should sync by raft.
        //临时实例使用Distro协议,持久化实例使用Raft协议
        //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
            return;
        }
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            //如果event是客户端注销实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
        } else if (event instanceof ClientEvent.ClientChangedEvent) {
            //如果event是客户端注册实例时需要进行集群节点同步的事件
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        }
    }
    ...
}

(4)总结

一.执行引擎的总结

延时任务执行引擎的实现原理是引擎有一个Map类型的tasks任务池,这个任务池可以根据key映射对应的任务处理器。引擎会定时从任务池中获取任务,执行任务处理器的处理方法处理任务。

任务执行引擎的实现原理是会创建多个任务执行Worker,每个任务执行Worker都会有一个阻塞队列。向任务执行引擎添加任务时会将任务添加到其中一个Woker的阻塞队列中,Worker在初始化时就会启动一个线程不断取出阻塞队列中的任务来处理。所以任务执行引擎会通过阻塞队列 + 异步任务的方式来实现。

二.用于向集群节点同步数据的客户端改变事件的处理流程总结

步骤一:先创建DistroDelayTask延迟任务放入到延迟任务执行引擎的任务池,DistroDelayTask延迟任务会由DistroDelayTaskProcessor处理器处理。

步骤二:DistroDelayTaskProcessor处理时会创建DistroSyncChangeTask任务,然后再将任务分发添加到执行引擎中的任务执行Worker的阻塞队列中。

步骤三:任务执行Worker会从队列中获取并执行DistroSyncChangeTask任务,也就是执行引擎会触发调用AbstractDistroExecuteTask的run()方法,从而调用DistroSyncChangeTask的doExecuteWithCallback()方法。

步骤四:doExecuteWithCallback()方法会获取最新的微服务实例列表,然后通过DistroClientTransportAgent的syncData()方法发送数据同步请求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2372585.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TIME - MoE 模型代码 3.2——Time-MoE-main/time_moe/datasets/time_moe_dataset.py

源码&#xff1a;GitHub - Time-MoE/Time-MoE: [ICLR 2025 Spotlight] Official implementation of "Time-MoE: Billion-Scale Time Series Foundation Models with Mixture of Experts" 这段代码定义了一个用于时间序列数据处理的 TimeMoEDataset 类&#xff0c;支…

【某OTA网站】phantom-token 1004

新版1004 phantom-token 请求头中包含phantom-token 定位到 window.signature 熟悉的vmp 和xhs一样 最新环境检测点 最新检测 canvas 下的 toDataURL方法较严 过程中 会用setAttribute给canvas 设置width height 从而使toDataURL返回不同的值 如果写死toDataURL的返回值…

OrangePi Zero 3学习笔记(Android篇)2 - 第一个C程序

目录 1. 创建项目文件夹 2. 创建c/cpp文件 3. 创建Android.mk/Android.bp文件 3.1 Android.mk 3.2 Android.bp 4. 编译 5. adb push 6. 打包到image中 在AOSP里面添加一个C或C程序&#xff0c;这个程序在Android中需要通过shell的方式运行。 1. 创建项目文件夹 首先需…

DeepResearch深度搜索实现方法调研

DeepResearch深度搜索实现方法调研 Deep Research 有三个核心能力 能力一&#xff1a;自主规划解决问题的搜索路径&#xff08;生成子问题&#xff0c;queries&#xff0c;检索&#xff09;能力二&#xff1a;在探索路径时动态调整搜索方向&#xff08;刘亦菲最好的一部电影是…

【论文阅读】基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别

Efficient distribution similarity identification in clustered federated learning via principal angles between client data subspaces -- 基于客户端数据子空间主角度的聚类联邦学习分布相似性高效识别 论文来源TLDR背景与问题两个子空间之间的主角&#xff08;Principa…

Elasticsearch知识汇总之ElasticSearch部署

五 ElasticSearch部署 部署Elasticsearch&#xff0c;可以在任何 Linux、MacOS 或 Windows 机器上运行 Elasticsearch。在Docker 容器 中运行 Elasticsearch 。使用Elastic Cloud on Kubernetes 设置和管理 Elasticsearch、Kibana、Elastic Agent 以及 Kubernetes 上的 Elasti…

ROBOVERSE:面向可扩展和可泛化机器人学习的统一平台、数据集和基准

25年4月来自UC Berkeley、北大、USC、UMich、UIUC、Stanford、CMU、UCLA 和 北京通用 AI 研究院&#xff08;BIGAI&#xff09;的论文“ROBOVERSE: Towards a Unified Platform, Dataset and Benchmark for Scalable and Generalizable Robot Learning”。 数据扩展和标准化评…

(41)VTK C++开发示例 ---qt使用vtk最小示例

文章目录 1. 概述2. CMake链接VTK3. main.cpp文件4. 演示效果 更多精彩内容&#x1f449;内容导航 &#x1f448;&#x1f449;VTK开发 &#x1f448; 1. 概述 本文演示了在Qt中使用VTK的最小示例程序&#xff0c;使用VTK创建显示一个锥体&#xff1b; 采用Cmake作为构建工具&a…

OS7.【Linux】基本指令入门(6)

目录 1.zip和unzip 配置指令 使用 两个名词:打包和压缩 打包 压缩 Linux下的操作演示 压缩和解压缩文件 压缩和解压缩目录 -d选项 2.tar Linux下的打包和压缩方案简介 czf选项 xzf选项 -C选项 tzf选项 3.bc 4.uname 不带选项的uname -a选项 -r选项 -v选项…

国标GB28181视频平台EasyCVR安防系统部署知识:如何解决异地监控集中管理和组网问题

在企业、连锁机构及园区管理等场景中&#xff0c;异地监控集中管控与快速组网需求日益迫切。弱电项目人员和企业管理者亟需整合分散监控资源&#xff0c;实现跨区域统一管理与实时查看。 一、解决方案 案例一&#xff1a;运营商专线方案​ 利用运营商专线&#xff0c;连接各分…

O2O上门服务如何颠覆传统足浴行业?真实案例分析

在湖南经营传统足浴店的张总最近遇到了件让他哭笑不得的事。原本他的门店生意还算稳定&#xff0c;虽然这两年行情不好&#xff0c;但靠着老顾客还能勉强维持。可谁想到&#xff0c;一次好心帮忙&#xff0c;竟让他发现了行业的新天地。 几年前&#xff0c;张总的一位做砂石生意…

金仓数据库永久增量备份技术原理与操作

先用一张图说明一下常见的备份方式 为什么需要永久增量备份 传统的数据库备份方案通常是间隔7天对数据库做一次全量备份&#xff08;完整备份&#xff09;&#xff0c;每天会基于全量备份做一次增量备份&#xff0c;如此循环&#xff0c;这种备份方案在全备数据量过大场景下…

19、HashTable(哈希)、位图的实现和布隆过滤器的介绍

一、了解哈希【散列表】 1、哈希的结构 在STL中&#xff0c;HashTable是一个重要的底层数据结构, 无序关联容器包括unordered_set, unordered_map内部都是基于哈希表实现 哈希表又称散列表&#xff0c;一种以「key-value」形式存储数据的数据结构。哈希函数&#xff1a;负责将…

mysql中int(1) 和 int(10) 有什么区别?

困惑 最近遇到个问题&#xff0c;有个表的要加个user_id字段&#xff0c;user_id字段可能很大&#xff0c;于是我提mysql工单​​alter table xxx ADD user_id int(1)​​。领导看到我的sql工单&#xff0c;于是说&#xff1a;这int(1)怕是不够用吧&#xff0c;接下来是一通解…

FreeRTOS如何实现100%的硬实时性?

实时系统在嵌入式应用中至关重要&#xff0c;其核心在于确保任务在指定时间内完成。根据截止时间满足的严格程度&#xff0c;实时系统分为硬实时和软实时。硬实时系统要求任务100%满足截止时间&#xff0c;否则可能导致灾难性后果&#xff0c;例如汽车安全系统或医疗设备。软实…

element-ui日期时间选择器禁止输入日期

需求解释&#xff1a;时间日期选择器&#xff0c;下方日期有禁止选择范围&#xff0c;所以上面的日期输入框要求禁止输入&#xff0c;但时间输入框可以输入&#xff0c;也就是下图效果&#xff0c;其中日历中的禁止选择可以通过【picker-options】这个属性实现&#xff0c;此属…

[论文阅读]Deeply-Supervised Nets

摘要 我们提出的深度监督网络&#xff08;DSN&#xff09;方法在最小化分类误差的同时&#xff0c;使隐藏层的学习过程更加直接和透明。我们尝试通过研究深度网络中的新公式来提升分类性能。我们关注卷积神经网络&#xff08;CNN&#xff09;架构中的三个方面&#xff1a;&…

多模态大语言模型arxiv论文略读(六十二)

MileBench: Benchmarking MLLMs in Long Context ➡️ 论文标题&#xff1a;MileBench: Benchmarking MLLMs in Long Context ➡️ 论文作者&#xff1a;Dingjie Song, Shunian Chen, Guiming Hardy Chen, Fei Yu, Xiang Wan, Benyou Wang ➡️ 研究机构: The Chinese Univers…

现代框架对SEO的深度影响

第8章&#xff1a;现代框架对SEO的深度影响 1. 引言 Next 和 Nuxt 是两个 &#x1f525;热度和使用度都最高 的现代 Web 开发框架&#xff0c;它们分别基于 ⚛️React 和 &#x1f596;Vue 构建&#xff0c;也代表了这两个生态的 &#x1f310;全栈框架。 Next 是由 Vercel 公司…

密码学--RSA

一、实验目的 1.随机生成明文和加密密钥 2.利用C语言实现素数选择&#xff08;素性判断&#xff09;的算法 3.利用C语言实现快速模幂运算的算法&#xff08;模重复平方法&#xff09; 4.利用孙子定理实现解密程序 5.利用C语言实现RSA算法 6.利用RSA算法进行数据加/解密 …