Nacos源码—9.Nacos升级gRPC分析七

news2025/7/9 12:04:38

大纲

10.gRPC客户端初始化分析

11.gRPC客户端的心跳机制(健康检查)

12.gRPC服务端如何处理客户端的建立连接请求

13.gRPC服务端如何映射各种请求与对应的Handler处理类

14.gRPC简单介绍

10.gRPC客户端初始化分析

(1)gRPC客户端代理初始化的源码

(2)gRPC客户端启动的源码

(3)gRPC客户端发起与服务端建立连接请求的源码

(1)gRPC客户端代理初始化的源码

Nacos客户端注册服务实例时会调用NacosNamingService的registerInstance()方法,接着会调用NamingClientProxyDelegate的registerService()方法,然后判断注册的服务实例是不是临时的。如果注册的服务实例是临时的,那么就使用gRPC客户端代理去进行注册。如果注册的服务实例不是临时的,那么就使用HTTP客户端代理去进行注册。

NacosNamingService的init()方法在创建客户端代理,也就是执行NamingClientProxyDelegate的构造方法时,便会创建和初始化gRPC客户端代理NamingGrpcClientProxy。

创建和初始化gRPC客户端代理NamingGrpcClientProxy时,首先会由RpcClientFactory的createClient()方法创建一个RpcClient对象,并将GrpcClient对象赋值给NamingGrpcClientProxy的rpcClient属性,然后调用NamingGrpcClientProxy的start()方法启动RPC客户端连接。

在NamingGrpcClientProxy的start()方法中,会先注册一个用于处理服务端推送请求的NamingPushRequestHandler,然后调用RpcClient的start()方法启动RPC客户端即RpcClient对象,最后将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册。

public class NacosNamingService implements NamingService {
    ...
    private NamingClientProxy clientProxy;
    
    private void init(Properties properties) throws NacosException {
        ...
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
    }
    ...
    @Override
    public void registerInstance(String serviceName, Instance instance) throws NacosException {
        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
    }
    
    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        //调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法
        clientProxy.registerService(serviceName, groupName, instance);
    }
    ...
}

//客户端代理
public class NamingClientProxyDelegate implements NamingClientProxy {
    private final NamingHttpClientProxy httpClientProxy;
    private final NamingGrpcClientProxy grpcClientProxy;
    
    public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
        ...
        //初始化HTTP客户端代理
        this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
        //初始化gRPC客户端代理
        this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
    }
    ...
    
    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    }
    
    private NamingClientProxy getExecuteClientProxy(Instance instance) {
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
    }
    ...
}

//gRPC客户端代理
public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
    private final String namespaceId;
    private final String uuid;    
    private final Long requestTimeout;    
    private final RpcClient rpcClient;
    private final NamingGrpcRedoService redoService;
    
    //初始化gRPC客户端代理
    public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        super(securityProxy);
        this.namespaceId = namespaceId;
        this.uuid = UUID.randomUUID().toString();
        this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
        Map<String, String> labels = new HashMap<String, String>();
        labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
        labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
        //1.通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性
        this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
        this.redoService = new NamingGrpcRedoService(this);
        //2.启动gRPC客户端代理NamingGrpcClientProxy
        start(serverListFactory, serviceInfoHolder);
    }
    
    private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
        rpcClient.serverListFactory(serverListFactory);
        //注册连接监听器
        rpcClient.registerConnectionListener(redoService);
        //1.注册一个用于处理服务端推送请求的NamingPushRequestHandler
        rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
        //2.启动RPC客户端RpcClient
        rpcClient.start();
        //3.将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册
        NotifyCenter.registerSubscriber(this);
    }
    ...
    
    @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);
        redoService.cacheInstanceForRedo(serviceName, groupName, instance);
        //执行服务实例的注册
        doRegisterService(serviceName, groupName, instance);
    }
    
    //Execute register operation.
    public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
        //创建请求参数对象
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);
        //向服务端发起请求
        requestToServer(request, Response.class);
        redoService.instanceRegistered(serviceName, groupName);
    }
    
    private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
        try {
            request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
            //实际会调用RpcClient.request()方法发起gRPC请求
            Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
            if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            if (responseClass.isAssignableFrom(response.getClass())) {
                return (T) response;
            }
            NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
        } catch (Exception e) {
            throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
        }
        throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
    }
    ...
}

public class RpcClientFactory {
    private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();
    ...
    
    //create a rpc client.
    public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {
        if (!ConnectionType.GRPC.equals(connectionType)) {
            throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());
        }
        return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {
            LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);
            try {
                //创建GrpcClient对象
                GrpcClient client = new GrpcSdkClient(clientNameInner);
                //设置线程核心数和最大数
                client.setThreadPoolCoreSize(threadPoolCoreSize);
                client.setThreadPoolMaxSize(threadPoolMaxSize);
                client.labels(labels);
                return client;
            } catch (Throwable throwable) {
                LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);
                throw throwable;
            }
        });
    }
    ...
}

(2)gRPC客户端启动的源码

NamingGrpcClientProxy的start()方法会通过调用RpcClient的start()方法,来启动RPC客户端即RpcClient对象。

在RpcClient的start()方法中,首先会利用CAS来修改RPC客户端(RpcClient)的状态,也就是将RpcClient.rpcClientStatus属性从INITIALIZED更新为STARTING。

然后会创建一个核心线程数为2的线程池,并提交两个任务。任务一是处理连接成功或连接断开时的线程,任务二是处理重连或健康检查的线程。

接着会创建Connection连接对象,也就是在while循环中调用GrpcClient的connectToServer()方法,尝试与服务端建立连接。如果连接失败,则会抛出异常并且进行重试,由于是同步连接,所以最大重试次数是3。

最后当客户端与服务端成功建立连接后,会把对应的Connection连接对象赋值给RpcClient.currentConnection属性,并且修改RpcClient.rpcClientStatus属性即RPC客户端状态为RUNNING。

如果客户端与服务端连接失败,则会通过异步尝试进行连接,也就是调用RpcClient的switchServerAsync()方法,往RpcClient的reconnectionSignal队列中放入一个ReconnectContext对象,reconnectionSignal队列中的元素会交给任务2来处理。

public abstract class RpcClient implements Closeable {
    protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);
    protected ScheduledExecutorService clientEventExecutor;
    protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
    //在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性
    protected volatile Connection currentConnection;
    private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
    ...
    
    public final void start() throws NacosException {
        //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING
        boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
        if (!success) {
            return;
        }

        //接下来创建调度线程池执行器,并提交两个任务
        clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            return t;
        });
    
        //任务1:处理连接成功或连接断开时的线程
        clientEventExecutor.submit(() -> {
            ...     
        });

        //任务2:处理重连或健康检查的线程
        clientEventExecutor.submit(() -> {
            ...
        });

        //创建连接对象
        Connection connectToServer = null;
        rpcClientStatus.set(RpcClientStatus.STARTING);

        //重试次数为3次
        int startUpRetryTimes = RETRY_TIMES;
        //在while循环中尝试与服务端建立连接,最多循环3次
        while (startUpRetryTimes > 0 && connectToServer == null) {
            try {
                startUpRetryTimes--;
                //获取服务端信息
                ServerInfo serverInfo = nextRpcServer();
                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);
                //调用GrpcClient.connectToServer()方法建立和服务端的长连接
                connectToServer = connectToServer(serverInfo);
            } catch (Throwable e) {
                LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);
            }
        }

        //如果连接成功,connectToServer对象就不为空
        if (connectToServer != null) {
            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
            //连接对象赋值,currentConnection其实就是一个在客户端使用的GrpcConnection对象实例
            this.currentConnection = connectToServer;
            //更改RPC客户端RpcClient的状态
            rpcClientStatus.set(RpcClientStatus.RUNNING);
            //往eventLinkedBlockingQueue队列放入ConnectionEvent事件
            eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
        } else {
            //尝试进行异步连接
            switchServerAsync();
        }

        registerServerRequestHandler(new ConnectResetRequestHandler());    
        //register client detection request.
        registerServerRequestHandler(request -> {
            if (request instanceof ClientDetectionRequest) {
                return new ClientDetectionResponse();
            }
            return null;
        });
    }
    
    protected ServerInfo nextRpcServer() {
        String serverAddress = getServerListFactory().genNextServer();
        //获取服务端信息
        return resolveServerInfo(serverAddress);
    }
    
    private ServerInfo resolveServerInfo(String serverAddress) {
        Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress);
        if (matcher.find()) {
            serverAddress = matcher.group(1);
        }
        String[] ipPortTuple = serverAddress.split(Constants.COLON, 2);
        int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848"));
        String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort));
        return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort));
    }
    
    public void switchServerAsync() {
        //异步注册逻辑
        switchServerAsync(null, false);
    }
    
    protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {
        //往reconnectionSignal队列里放入一个对象
        reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));
    }
    ...
}

(3)gRPC客户端发起与服务端建立连接请求的源码

gRPC客户端与服务端建立连接的方法是GrpcClient的connectToServer()方法。该方法首先会获取进行网络通信的端口号,因为gRPC服务需要额外占用一个端口的,所以这个端口号是在Nacos的8848基础上 + 偏移量1000,变成9848。

在建立连接之前,会先检查一下服务端,如果没问题才发起连接请求,接着就会调用GrpcConnection的sendRequest()方法发起连接请求,最后返回GrpcConnection连接对象。

public abstract class GrpcClient extends RpcClient {
    ...
    @Override
    public Connection connectToServer(ServerInfo serverInfo) {
        try {
            if (grpcExecutor == null) {
                this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
            }
            //获取端口号:gRPC服务需要额外占用一个端口的,这个端口是在Nacos 8848的基础上,+ 偏移量1000,所以是9848
            int port = serverInfo.getServerPort() + rpcPortOffset();
            RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);
            if (newChannelStubTemp != null) {
                //检查一下服务端,没问题才会发起RPC连接请求
                Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
                if (response == null || !(response instanceof ServerCheckResponse)) {
                    shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());
                    return null;
                }
            
                BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());
                //创建连接对象
                GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
                grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
            
                //create stream request and bind connection event to this connection.
                //创建流请求并将连接事件绑定到此连接
                StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
            
                //stream observer to send response to server
                grpcConn.setPayloadStreamObserver(payloadStreamObserver);
                grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
                grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
                //send a  setup request.
                ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
                conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
                conSetupRequest.setLabels(super.getLabels());
                conSetupRequest.setAbilities(super.clientAbilities);
                conSetupRequest.setTenant(super.getTenant());
                //发起连接请求
                grpcConn.sendRequest(conSetupRequest);
                //wait to register connection setup
                Thread.sleep(100L);
                return grpcConn;
            }
            return null;
        } catch (Exception e) {
            LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
        }
        return null;
    }
    
    private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
        try {
            if (requestBlockingStub == null) {
                return null;
            }
            ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
            Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
            //向服务端发送一个检查请求
            ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
            Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);
            //receive connection unregister response here,not check response is success.
            return (Response) GrpcUtils.parse(response);
        } catch (Exception e) {
            LoggerUtils.printIfErrorEnabled(LOGGER, "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
            return null;
        }
    }
    
    private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) {
        //调用BiRequestStreamStub.requestBiStream()方法连接服务端
        return streamStub.requestBiStream(new StreamObserver<Payload>() {
            @Override
            public void onNext(Payload payload) {
                LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", grpcConn.getConnectionId(), payload.toString());
                try {
                    Object parseBody = GrpcUtils.parse(payload);
                    final Request request = (Request) parseBody;
                    if (request != null) {
                        try {
                            Response response = handleServerRequest(request);
                            if (response != null) {
                                response.setRequestId(request.getRequestId());
                                sendResponse(response);
                            } else {
                                LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(), request.getRequestId());
                            }
                        } catch (Exception e) {
                            LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}", grpcConn.getConnectionId(), payload.toString(), e.getMessage());
                            Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR, "Handle server request error");
                            errResponse.setRequestId(request.getRequestId());
                            sendResponse(errResponse);
                        }
                    }
                } catch (Exception e) {
                    LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
                }
            }
            
            @Override
            public void onError(Throwable throwable) {
                boolean isRunning = isRunning();
                boolean isAbandon = grpcConn.isAbandon();
                if (isRunning && !isAbandon) {
                    LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", grpcConn.getConnectionId(), throwable);
                    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                        switchServerAsync();
                    }
                } else {
                    LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);
                }
            }
            
            @Override
            public void onCompleted() {
                boolean isRunning = isRunning();
                boolean isAbandon = grpcConn.isAbandon();
                if (isRunning && !isAbandon) {
                    LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", grpcConn.getConnectionId());
                    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                        switchServerAsync();
                    }
                } else {
                    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);
                }
            }
        });
    }
    ...
}

(4)总结

11.gRPC客户端的心跳机制(健康检查)

(1)线程任务一:处理连接成功或连接断开时的通知

(2)线程任务二:处理重连或健康检查

RpcClient的start()方法会调用GrpcClient的connectToServer()方法连接服务端,不管连接是否成功,最后都会往不同的阻塞队列中添加事件。

如果连接成功,那么就往RpcClient的eventLinkedBlockingQueue添加连接事件。如果连接失败,那么就往RpcClient的reconnectionSignal队列添加重连对象。而这两个阻塞队列中的数据处理,便是由执行RpcClient的start()方法时启动的两个线程任务进行处理的。

(1)线程任务一:处理连接成功或连接断开时的通知

这个任务主要在连接成功或者连接断开时,修改一些属性状态。通过eventLinkedBlockingQueue的take()方法从队列取到连接事件后,会判断连接事件是否建立连接还是断开连接。

如果是建立连接,那么就调用RpcClient的notifyConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为true。

如果是断开连接,那么就调用RpcClient的notifyDisConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为false。

public abstract class RpcClient implements Closeable {
    protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);
    protected ScheduledExecutorService clientEventExecutor;
    protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
    private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
    //listener called where connection's status changed. 连接状态改变的监听器
    protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();
    ...
    
    public final void start() throws NacosException {
        //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING
        boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
        if (!success) {
            return;
        }

        //接下来创建调度线程池执行器,并提交两个任务
        clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            return t;
        });
    
        //任务1:处理连接成功或连接断开时的线程
        clientEventExecutor.submit(() -> {
            while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
                ConnectionEvent take;
                try {
                    take = eventLinkedBlockingQueue.take();
                    if (take.isConnected()) {
                        notifyConnected();
                    } else if (take.isDisConnected()) {
                        notifyDisConnected();
                    }
                } catch (Throwable e) {
                    // Do nothing
                }
            }   
        });

        //任务2:向服务端上报心跳或重连的线程
        clientEventExecutor.submit(() -> {
            ...
        });
    }
    ...
                                   
    //Notify when client new connected.
    protected void notifyConnected() {
        if (connectionEventListeners.isEmpty()) {
            return;
        }
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);
        for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
            try {
                connectionEventListener.onConnected();
            } catch (Throwable throwable) {
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name, connectionEventListener.getClass().getName());
            }
        }
    }
    
    //Notify when client disconnected.
    protected void notifyDisConnected() {
        if (connectionEventListeners.isEmpty()) {
            return;
        }
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);
        for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
            try {
                connectionEventListener.onDisConnect();
            } catch (Throwable throwable) {
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name, connectionEventListener.getClass().getName());
            }
        }
    }
    ...
    
    //Register connection handler. Will be notified when inner connection's state changed.
    //在执行NamingGrpcClientProxy.start()方法时会将NamingGrpcRedoService对象注册到connectionEventListeners中
    public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) {
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName());
        this.connectionEventListeners.add(connectionEventListener);
    }
    ...
}

public class NamingGrpcRedoService implements ConnectionEventListener {
    private volatile boolean connected = false;
    ...
    
    @Override
    public void onConnected() {
        connected = true;
        LogUtils.NAMING_LOGGER.info("Grpc connection connect");
    }
    
    @Override
    public void onDisConnect() {
        connected = false;
        LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
        synchronized (registeredInstances) {
            registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));
        }
        synchronized (subscribes) {
            subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));
        }
        LogUtils.NAMING_LOGGER.warn("mark to redo completed");
    }
    ...
}

(2)线程任务二:处理重连或健康检查

如果RpcClient的start()方法在调用GrpcClient的connectToServer()方法连接服务端时失败了,那么会往RpcClient.reconnectionSignal队列添加重连对象的,而这个任务就会获取reconnectionSignal队列中的重连对象进行重连。

因为reconnectionSignal中的数据是当连接失败时放入的,所以如果从reconnectionSignal中获取不到重连对象,等同于连接成功。

注意:这个任务从reconnectionSignal阻塞队列中获取重连对象时,调用的是阻塞队列的take()方法,而不是阻塞队列的poll()方法。BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态。BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null。

情况一:如果从reconnectionSignal队列中获取到的重连对象为null

首先判断存活时间是否大于 5s,如果大于则调用RpcClient.healthCheck()方法发起健康检查的RPC请求。健康检查的触发方法是currentConnection.request()方法,健康检查的请求类型是HealthCheckRequest。

如果健康检查成功,只需刷新存活时间即可。如果健康检查失败,则需要尝试与服务端重新建立连接。

情况二:如果从reconnectionSignal队列中获取到的重连对象不为null

那么就调用RpcClient的reconnect()方法进行重新连接,该方法会通过GrpcClient的connectToServer()方法尝试与服务端建立连接。

public abstract class RpcClient implements Closeable {
    protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);
    protected ScheduledExecutorService clientEventExecutor;
    protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();
    private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);
    ...
    
    public final void start() throws NacosException {
        //利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTING
        boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
        if (!success) {
            return;
        }

        //接下来创建调度线程池执行器,并提交两个任务
        clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.remote.worker");
            t.setDaemon(true);
            return t;
        });
    
        //任务1:处理连接成功或连接断开时的线程
        clientEventExecutor.submit(() -> {
            ...     
        });

        //任务2:向服务端上报心跳或重连的线程
        clientEventExecutor.submit(() -> {
            while (true) {
                try {
                    if (isShutdown()) {
                        break;
                    }
                    //这里从reconnectionSignal阻塞队列中获取任务不是调用take()方法,而是调用poll()方法,并且指定了5s的最大读取时间
                    //BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态
                    //BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null
                    ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);

                    //reconnectContext为null,说明从reconnectionSignal中获取不到数据
                    //由于reconnectionSignal中的数据是当连接失败时放入的
                    //所以从reconnectionSignal中获取不到数据,等同于连接成功
                    if (reconnectContext == null) {
                        //check alive time.
                        //检查存活时间,默认存活时间为5s,超过5s就需要做健康检查
                        if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
                            //调用RpcClient.healthCheck()方法,发起健康检查请求
                            boolean isHealthy = healthCheck();
                            //如果向服务端发起健康检查请求失败,则需要尝试重新建立连接
                            if (!isHealthy) {
                                if (currentConnection == null) {
                                    continue;
                                }
                                LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", name, currentConnection.getConnectionId());
                                //判断连接状态是否关闭,如果是则结束异步任务
                                RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();
                                if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {
                                    break;
                                }
                                //修改RpcClient的连接状态为不健康
                                boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
                                //给reconnectContext属性赋值,准备尝试重连
                                if (statusFLowSuccess) {
                                    //重新赋值,注意这里没有continue,所以逻辑会接着往下执行
                                    reconnectContext = new ReconnectContext(null, false);
                                } else {
                                    continue;
                                }
                            } else {
                                //如果向服务端发起健康检查请求成功,则刷新RpcClient的存活时间
                                lastActiveTimeStamp = System.currentTimeMillis();
                                continue;
                            }
                        } else {
                            continue;
                        }
                    }
                    
                    if (reconnectContext.serverInfo != null) {
                        //clear recommend server if server is not in server list.
                        //如果服务器不在服务器列表中,则清除推荐服务器,即设置reconnectContext.serverInfo为null

                        boolean serverExist = false;
                        //遍历服务端列表
                        for (String server : getServerListFactory().getServerList()) {
                            ServerInfo serverInfo = resolveServerInfo(server);
                            if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {
                                serverExist = true;
                                reconnectContext.serverInfo.serverPort = serverInfo.serverPort;
                                break;
                            }
                        }
                        //reconnectContext.serverInfo不存在服务端列表中,就清除服务器信息,设置reconnectContext.serverInfo为null
                        if (!serverExist) {
                            LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", name, reconnectContext.serverInfo.getAddress());
                            reconnectContext.serverInfo = null;
                        }
                    }
                    //进行重新连接,RpcClient.reconnect()方法中会调用GrpcClient.connectToServer()方法尝试与服务端建立连接
                    reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
                } catch (Throwable throwable) {
                    //Do nothing
                }
            }
        });
    }
    
    private boolean healthCheck() {
        HealthCheckRequest healthCheckRequest = new HealthCheckRequest();
        if (this.currentConnection == null) {
            return false;
        }
        try {
            //利用currentConnection连接对象,发起RPC请求,请求类型是HealthCheckRequest
            Response response = this.currentConnection.request(healthCheckRequest, 3000L);
            //not only check server is ok, also check connection is register.
            return response != null && response.isSuccess();
        } catch (NacosException e) {
            //ignore
        }
        return false;
    }
    ...
}

(3)总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375268.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机视觉】基于深度学习的实时情绪检测系统:emotion-detection项目深度解析

基于深度学习的实时情绪检测系统&#xff1a;emotion-detection项目深度解析 1. 项目概述2. 技术原理与模型架构2.1 核心算法1) 数据预处理流程2) 改进型MobileNetV2 2.2 系统架构 3. 实战部署指南3.1 环境配置3.2 数据集准备3.3 模型训练3.4 实时推理 4. 常见问题与解决方案4.…

【图像处理基石】什么是油画感?

在图像处理中&#xff0c;“油画感”通常指图像呈现出类似油画的块状纹理、笔触痕迹或色彩过渡不自然的现象&#xff0c;表现为细节模糊、边缘不锐利、颜色断层或人工纹理明显。这种问题常见于照片处理、视频帧截图或压缩后的图像&#xff0c;本质是画质受损的一种表现。以下是…

AD PCB布线的常用命令

PCB布线顺序&#xff1a;先信号&#xff0c;再电源&#xff0c;再GNG 1.多根走线的应用 将IC上的引脚分类 更改一类引脚以及引线的颜色&#xff0c;画出走线&#xff08;将脚引出&#xff09; 选中这些走线&#xff0c;点击‘交互式总线布线’&#xff0c;便可以多根拉线 shi…

【3-2】HDLC

前言 前面我们提到了 PSTN&#xff08;Public Switched Telephone Network&#xff09; &#xff0c;今天介绍一种很少见的数据链路层的协议&#xff0c;HDLC&#xff01; 文章目录 前言1. 定义2. 帧边界3. 零比特填充4. 控制字段4.1. 信息帧&#xff08;I帧&#xff09;4.2. …

MySQL 学习(八)如何打开binlog日志

目录 一、默认状态二、如何检查 binlog 状态三、如何开启 binlog3.1 临时开启&#xff08;重启后失效&#xff09;3.2 永久开启&#xff08;需修改配置文件&#xff09;3.3 验证是否开启成功3.4 查看 binlog 内容 四、高级配置建议五、注意事项六、开启后的日常维护 知识回顾&a…

OpenCV进阶操作:光流估计

文章目录 前言一、光流估计1、光流估计是什么&#xff1f;2、光流估计的前提&#xff1f;1&#xff09;亮度恒定2&#xff09;小运动3&#xff09;空间一致 3、OpenCV中的经典光流算法1&#xff09;Lucas-Kanade方法&#xff08;稀疏光流&#xff09;2&#xff09; Farneback方…

4. 文字效果/2D-3D转换 - 3D翻转卡片

4. 文字效果/2D-3D转换 - 3D翻转卡片 案例&#xff1a;3D产品展示卡片 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><style type"text/css">.scene {width: 300px;height…

【AI News | 20250513】每日AI进展

AI Repos 1、iap-diffusion-labs 从零开始带我们构建完整的扩散模型。通过三个精心设计的实验练习&#xff0c;循序渐进地引导我们实现流匹配和扩散模型&#xff0c;从基础 SDE 到条件图像生成&#xff0c;每一步都有详尽指导和完整代码&#xff0c;让复杂理论简单易懂。主要内…

mybatisplus 集成逻辑删除

一开始&#xff0c;没去查资料&#xff0c;后面要被AI气死了&#xff0c;先看它的的话 一开始&#xff0c;看ai的描述&#xff0c;我还以为&#xff0c;不需要改数据库&#xff0c;mybatis-puls自动拦截集成就可以实现逻辑删除&#xff0c;c&#xff0c;最后还是要给数据库加一…

SimScape物理建模实例2--带控制的单质量弹簧阻尼系统

模型下载&#xff1a; 基于simscape&#xff0c;单质量系统带位置控制资源-CSDN文库 在实例1中&#xff0c;我们搭建了不带控制的单质量弹簧阻尼系统&#xff0c;该系统没有外界力量介入&#xff0c;只有弹簧的初始弹力&#xff0c;带着弹簧使劲弹来弹去。 SimScape物理建模实…

PyGame游戏开发(含源码+演示视频+开结题报告+设计文档)

前言&#xff1a; 大二小学期python课上基于pygame做的一个游戏小demo&#xff0c;当时老师花了一天讲解了下python基础语法后&#xff08;也是整个大学四年唯一学习python的时间&#xff09;&#xff0c;便让我们自学网课一周然后交项目&#xff0c;所以做的非常仓促&#xff…

拒绝flash插件打劫!如何在vscode上玩4399小游戏

现在电脑上玩4399都需要flash插件了 这也导致了很多人无法玩到小时候的游戏 今天介绍一款插件 功能强大 即安即玩 首先打开vscode 点开小方框&#xff08;拓展&#xff09;搜索4399 认准4399 on vscode点击安装 安装完毕后 按下 Ctrl Shift P , 输入 4399 on VSCode 或…

learning ray之ray核心设计和架构

我们每天都在处理海量、多样且高速生成的数据&#xff0c;这对计算能力提出了前所未有的挑战。传统的单机计算模式在面对日益复杂的机器学习模型和大规模数据集时&#xff0c;往往显得力不从心。更重要的是&#xff0c;数据科学家们本应专注于模型训练、特征工程、超参数调优这…

C语言while循环的用法(非常详细,附带实例)

while 是 C 语言中的一种循环控制结构&#xff0c;用于在特定条件为真时重复执行一段代码。 while 循环的语法如下&#xff1a; while (条件表达式) { // 循环体&#xff1a;条件为真时执行的代码 } 条件表达式&#xff1a;返回真&#xff08;非 0&#xff09;或假&#x…

JavaScript进阶(九)

第三部分:JavaScript进阶 目录 第三部分:JavaScript进阶 一、作用域 1.1 局部作用域 1. 作用域 2. 局部作用域 函数作用域 块作用域 1.2 全局作用域 1.3 作用域链 1.4 JS垃圾回收机制 1. 什么是垃圾回收机制 2. 内存的声明周期 3. 垃圾回收的算法说明 引用计数…

数据结构与算法分析实验11 实现顺序查找表

实现顺序查找表 1.上机名称2.上机要求3.上机环境4.程序清单(写明运行结果及结果分析)4.1 程序清单4.1.1 头文件4.1.2 实现文件4.1.3 源文件 4.2 实现展效果示 上机体会 1.上机名称 实现顺序查找表 顺序查找表的基本概念 顺序查找表是一种线性数据结构&#xff0c;通常用于存储…

获取高德地图JS API的安全密钥和Key的方法

要使用高德地图JavaScript API&#xff0c;您需要获取API Key和安全密钥(securityJsCode)。以下是获取步骤&#xff1a; 1. 注册高德开放平台账号 首先访问高德开放平台&#xff0c;如果没有账号需要先注册。 2. 创建应用获取Key 登录后进入"控制台" 点击"应…

JAVA研发+前后端分离,ZKmall开源商城B2C商城如何保障系统性能?

在电商行业竞争白热化的当下&#xff0c;B2C 商城系统的性能表现成为决定用户留存与商业成败的关键因素。ZKmall 开源商城凭借 Java 研发与前后端分离架构的深度融合&#xff0c;构建起一套高效、稳定且具备强大扩展性的系统架构&#xff0c;从底层技术到上层应用全方位保障性能…

嵌入式自学第二十天(5.13)

&#xff08;1&#xff09;线性表顺序存储的优缺点&#xff1a; 优点&#xff1a;无需为表中逻辑关系添加额外存储空间&#xff1b; 可以快速随机访问元素&#xff0c;时间复杂度O(1)。 缺点&#xff1a;插入删除需要移动元素O(n&#xff09;&#xff1b; 无法动态存储。 …

快速上手Linux nfs网络文件系统

一、nfs服务的安装与部属 1.安装软件 设置火墙 测试&#xff1a;在客户端上安装nfs-utils后 showmount 服务端IP 2.共享资源 测试&#xff1a; 参数&#xff08;参数写在共享策略文件的括号里&#xff09; 二、nfs客户端动态挂载机制 当客户端和服务器之间没有数据交互时&am…