整体流程
在MqttTransportHandler中进行Mqtt消息处理,以AccessToken认证的设备举例,核心处理流程如下:
//MqttTransportHandler 132 processMqttMsg(ctx, (MqttMessage) msg); //MqttTransportHandler 154 processConnect(ctx, (MqttConnectMessage) msg); //MqttTransportHandler 474 processAuthTokenConnect(ctx, msg); //MqttTransportHandler 492 //构造请求消息,调用transportService处理请求消息 transportService.process(DeviceTransportType.MQTT, request.build(), //DefaultTransportService 271 //调用doProcess方法处理protoMsg消息,protoMsg中包含请求消息。 doProcess(transportType, protoMsg, callback); //DefaultTransportService 283 //调用transportApiRequestTemplate的send方法处理消息 ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> { //DefaultTbQueueRequestTemplate 180 //构造TopicPartitionInfo,使用requestTemplate(类型为TbQueueProducer)发送请求到指定消息队列的指定TOPIC中 //TbQueueProducer有多种实现,常见的是InMemoryTbQueueProducer和TbKafkaProducerTemplate requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() { //InMemoryTbQueueProducer 42 (作为requestTemplate的实现,在单体架构下调用) //调用storage(ConcurrentHashMap+LinkedBlockingQueue)存放消息,根据结果设置callback boolean result = storage.put(tpi.getFullTopicName(), msg); if (result) { if (callback != null) { callback.onSuccess(null); } } //TbKafkaProducerTemplate 82 (作为requestTemplate的实现,在微服务架构下设置消息队列为kafka时使用,其他消息队列类似) //使用Kafka生产者向Kafka Borker中发送消息,等待处理结果,然后根据异常是否为空callback对应属性。 producer.send(record, (metadata, exception) -> { if (exception == null) { if (callback != null) { callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata)); } } }); //DefaultTbQueueResponseTemplate 116 //调用handler //(类型为TbQueueHandler,具体实现为DefaultTransportApiService)处理请求(request), // 异步获取响应(response)结果 AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request), //DefaultTbQueueResponseTemplate 120 //调用responseTemplate(类型为TbQueueProducer)发送响应结果(response)到指定消息中间件的指定TOPIC中,TbQueueProducer有多种实现,常见的是InMemoryTbQueueProducer和TbKafkaProducerTemplate responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null); //DefaultTbQueueRequestTemplate 94 //调用responseTemplate(类型为TbQueueConsumer)获取消息 List<Response> responses = responseTemplate.poll(pollInterval); //DefaultTbQueueRequestTemplate 106 //设置future为response expectedResponse.future.set(response); //DefaultTransportService 303 //异步调用TransportServiceCallback onSuccess方法 AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor); //MqttTransportHandler 494 //调用onValidateDeviceResponse验证返回信息 onValidateDeviceResponse(msg, ctx, connectMessage); //MqttTransportHandler 646 //调用transportService处理结果消息 transportService.process(deviceSessionCtx.getSessionInfo(),DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() { //DefaultTransportService 360 //调用sendToDeviceActor处理会话信息 sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
//DefaultTransportService 760 //调用tbCoreMsgProducer(类型为TbQueueProducer)往消息中间件发送请求 tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),wrappedCallback); //MqttTransportHandler 651 //channel上下文中写入并刷新CONNACK消息。 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage)); |
设备认证示意图
- 设备认证请求
- 设备认证响应
由于很多太多的异步操作,导致时序图太复杂,暂时没有太好的办法绘制,有一张半成品在TIPS中可下载,感兴趣的同学可以了解下.
debug解读(以内存消息队列为例)
api请求转发配置
发送认证请求到api请求消息队列(DefaultTbQueueRequestTemplate)
调用processAuthTokenConnect方法,简单处理后发送到transportService,同时传入回调方法,下文同步转异步会使用该回调方法
在doProcess方法中完成同步转异步;transportApiRequestTemplate.send(protoMsg)方法返回一个SettableFuture,设置回调结果转换和回调callback(设置回调转换和callback均不阻塞主线程,回调方法的执行通过设置future结果触发):
发送消息,缓存请求,返回Future:
发送到消息队列:
消费认证请求,并发送结果到api响应消息队列(DefaultTbQueueResponseTemplate)
DefaultTbQueueResponseTemplate初始化时会开启一个子线程,循环从请求消息队列拉取请求:
查看到设备认证信息,则认证成功,构造认证成功结果并发送结果到响应消息队列
消费认证结果,并向设备端发送认证结果(DefaultTbQueueRequestTemplate)
DefaultTbQueueRequestTemplate开启循环获取并处理响应结果的子线程
DefaultTbQueueRequestTemplate#mainLoop
DefaultTbQueueRequestTemplate#fetchAndProcessResponses
DefaultTbQueueRequestTemplate#doPoll
InMemoryTbQueueConsumer#poll
认证结果匹配缓存的认证请求,设置Future结果,触发回调
回调结果转换
回调callback
onValidateDeviceResponse
微服务架构下api请求是怎么异步处理
上文以内存消息队列为例介绍了设备认证过程,我们发现发送消息前会将请求缓存在内存中,等接受到响应后,再从内存中获取请求,并通知设置SettableFuture结果触发回调方法,最后再同归netty发送认证结果。但是如果微服务部署,那需要使用kafka等中间件消息队列,那怎么实现A服务发送的api请求后一定能消费到该请求的结果消息呢?接下来以kafka为例介绍一下。
TbKafkaProducerTemplate 是kafka生产者类,其中send方法,可以看到发送的消息的topic就是配置文件配置的名称,所有服务实例均发送到该topic
到kafka中查看该消息:
发现原来在消息的headers中指定了该请求要响应的topic,因此发送api请求的topic是唯一的,所有服务实例共同处理请求(集群处理),但处理完成后需要回复请求中指定的api响应topic,每个服务实例消费自己拥有的api响应消息队列。
在API请求的处理如下:
然后看发送api请求的方法:DefaultTbQueueRequestTemplate#send(Request, long),可以看到构造request请求时设置了responseTopic:
而DefaultTbQueueRequestTemplate是在KafkaTbTransportQueueFactory中构造的,可以看到reponseTemplate的topic中包含了服务ID:
服务启动时,DefaultTransportService的init方法中调用了上述方法:
在请求处理之后,可以看到含有服务ID的topic有了结果消息:
TIPS
服务ID其实就是hostname,唯一标识一台主机:
kafka等中间件的topic都是在服务中动态创建的: