ThingsBoard源码解析-设备连接

news2025/8/13 8:00:54

整体流程

MqttTransportHandler中进行Mqtt消息处理,以AccessToken认证的设备举例,核心处理流程如下:

//MqttTransportHandler 132

processMqttMsg(ctx, (MqttMessage) msg);

 

//MqttTransportHandler 154

processConnect(ctx, (MqttConnectMessage) msg);
 

//MqttTransportHandler 474

processAuthTokenConnect(ctx, msg);
 

//MqttTransportHandler 492

//构造请求消息,调用transportService处理请求消息

transportService.process(DeviceTransportType.MQTT, request.build(),

//DefaultTransportService 271

//调用doProcess方法处理protoMsg消息,protoMsg中包含请求消息。

doProcess(transportType, protoMsg, callback);

//DefaultTransportService 283

//调用transportApiRequestTemplate的send方法处理消息

ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
 

//DefaultTbQueueRequestTemplate 180

//构造TopicPartitionInfo,使用requestTemplate(类型为TbQueueProducer)发送请求到指定消息队列的指定TOPIC中

//TbQueueProducer有多种实现,常见的是InMemoryTbQueueProducer和TbKafkaProducerTemplate

requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
 

//InMemoryTbQueueProducer 42 (作为requestTemplate的实现,在单体架构下调用)

//调用storage(ConcurrentHashMap+LinkedBlockingQueue)存放消息,根据结果设置callback

boolean result = storage.put(tpi.getFullTopicName(), msg);

if (result) {

    if (callback != null) {

        callback.onSuccess(null);

    }

}

//TbKafkaProducerTemplate 82 (作为requestTemplate的实现,在微服务架构下设置消息队列为kafka时使用,其他消息队列类似)

//使用Kafka生产者向Kafka Borker中发送消息,等待处理结果,然后根据异常是否为空callback对应属性。

producer.send(record, (metadata, exception) -> {

    if (exception == null) {

        if (callback != null) {

            callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));

        }

    }

});

//DefaultTbQueueResponseTemplate 116

//调用handler

//(类型为TbQueueHandler,具体实现为DefaultTransportApiService)处理请求(request),

// 异步获取响应(response)结果

AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
 

//DefaultTbQueueResponseTemplate 120

//调用responseTemplate(类型为TbQueueProducer)发送响应结果(response)到指定消息中间件的指定TOPIC中,TbQueueProducer有多种实现,常见的是InMemoryTbQueueProducer和TbKafkaProducerTemplate

responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);

//DefaultTbQueueRequestTemplate 94

//调用responseTemplate(类型为TbQueueConsumer)获取消息

List<Response> responses = responseTemplate.poll(pollInterval);
 

//DefaultTbQueueRequestTemplate 106

//设置future为response

expectedResponse.future.set(response);
 

//DefaultTransportService 303

//异步调用TransportServiceCallback onSuccess方法

AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
 

//MqttTransportHandler 494

//调用onValidateDeviceResponse验证返回信息

onValidateDeviceResponse(msg, ctx, connectMessage);

 

//MqttTransportHandler 646

//调用transportService处理结果消息

transportService.process(deviceSessionCtx.getSessionInfo(),DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {

       
 

//DefaultTransportService 360

//调用sendToDeviceActor处理会话信息

sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);

                   

//DefaultTransportService 760

//调用tbCoreMsgProducer(类型为TbQueueProducer)往消息中间件发送请求

tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),

 ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),wrappedCallback);

//MqttTransportHandler 651

//channel上下文中写入并刷新CONNACK消息。

ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));

设备认证示意图

  • 设备认证请求

  • 设备认证响应

由于很多太多的异步操作,导致时序图太复杂,暂时没有太好的办法绘制,有一张半成品在TIPS中可下载,感兴趣的同学可以了解下.

debug解读(以内存消息队列为例)

api请求转发配置

发送认证请求到api请求消息队列(DefaultTbQueueRequestTemplate

调用processAuthTokenConnect方法,简单处理后发送到transportService,同时传入回调方法,下文同步转异步会使用该回调方法

在doProcess方法中完成同步转异步;transportApiRequestTemplate.send(protoMsg)方法返回一个SettableFuture,设置回调结果转换和回调callback(设置回调转换和callback均不阻塞主线程,回调方法的执行通过设置future结果触发):

发送消息,缓存请求,返回Future:

发送到消息队列:

消费认证请求,并发送结果到api响应消息队列(DefaultTbQueueResponseTemplate

DefaultTbQueueResponseTemplate初始化时会开启一个子线程,循环从请求消息队列拉取请求:

查看到设备认证信息,则认证成功,构造认证成功结果并发送结果到响应消息队列

消费认证结果,并向设备端发送认证结果(DefaultTbQueueRequestTemplate

DefaultTbQueueRequestTemplate开启循环获取并处理响应结果的子线程

DefaultTbQueueRequestTemplate#mainLoop

DefaultTbQueueRequestTemplate#fetchAndProcessResponses

DefaultTbQueueRequestTemplate#doPoll

InMemoryTbQueueConsumer#poll

认证结果匹配缓存的认证请求,设置Future结果,触发回调

回调结果转换

回调callback

onValidateDeviceResponse

微服务架构下api请求是怎么异步处理

上文以内存消息队列为例介绍了设备认证过程,我们发现发送消息前会将请求缓存在内存中,等接受到响应后,再从内存中获取请求,并通知设置SettableFuture结果触发回调方法,最后再同归netty发送认证结果。但是如果微服务部署,那需要使用kafka等中间件消息队列,那怎么实现A服务发送的api请求后一定能消费到该请求的结果消息呢?接下来以kafka为例介绍一下。

TbKafkaProducerTemplate 是kafka生产者类,其中send方法,可以看到发送的消息的topic就是配置文件配置的名称,所有服务实例均发送到该topic

到kafka中查看该消息:

发现原来在消息的headers中指定了该请求要响应的topic,因此发送api请求的topic是唯一的,所有服务实例共同处理请求(集群处理),但处理完成后需要回复请求中指定的api响应topic,每个服务实例消费自己拥有的api响应消息队列。

在API请求的处理如下:

然后看发送api请求的方法:DefaultTbQueueRequestTemplate#send(Request, long),可以看到构造request请求时设置了responseTopic:

而DefaultTbQueueRequestTemplate是在KafkaTbTransportQueueFactory中构造的,可以看到reponseTemplate的topic中包含了服务ID:

服务启动时,DefaultTransportService的init方法中调用了上述方法:

在请求处理之后,可以看到含有服务ID的topic有了结果消息:

TIPS

服务ID其实就是hostname,唯一标识一台主机:

kafka等中间件的topic都是在服务中动态创建的:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/33322.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

视频文件转换器有哪些?什么视频文件转换器好用?

视频承载着丰富的文字、声音、图像&#xff0c;能够多维度地调用人的感知能力&#xff0c;可以说是当今时代信息输入的重要载体。 而视频有avi、rm、rmvb、3 gp等多种格式&#xff0c;当我们使用不同设备来观看视频时&#xff0c;就涉及到视频文件格式转换这一问题&#xff0c;…

RabbitMQ系列【14】备份交换机

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 文章目录前言代码实现测试前言 在之前&#xff0c;我们分析了消息可靠性之发布确认、退回机制。当消息到达交换机后&#xff0c;但是没有找到匹配的队列时&#xff0c;退回模式&#xff08;return&…

ms10-046漏洞利用+bypassuac提权

目录 前期准备 漏洞利用 上传文件到目标主机 UAC介绍 使用bypassuac模块绕过uac进行提权。 关于钓鱼链接的拓展 前期准备 Win xp sp3关闭防火墙 实验前提 保证连通性&#xff0c;进行互ping 漏洞利用 进入msf查看需要利用的漏洞&#xff1a;ms10-046 search ms10-046 …

【Kafka】单分区单副本增加至多分区多副本

一、背景 系统&#xff1a;CentOS Linux release 7.9.2009 (Core) Kafka版本&#xff1a;2.11-2.0.0.3.1.4.0-315 [scala版本2.11&#xff1b;kafka 2.0.0版本&#xff1b;基于ambari3.1.4.0-315的版本 ] 二、现象 业务系统中总是报警&#xff1a;kafka消费延迟。 三、问题…

nodejs+vue+elementui线上买菜系统

本线上买菜系统主要包括三大功能模块&#xff0c;即管理员和用户。 &#xff08;1&#xff09;管理员模块&#xff1a;首页、个人中心、用户管理、商品分类管理、商品信息管理、系统管理、订单管理。 &#xff08;2&#xff09;前台&#xff1a;商品信息、公告信息、个人中心、…

java语言概述

目录 JDK和JRE的说明 Java语言的环境搭建 常用的DOS命令 第一个Java程序 创建java源文件 Hello.java 编译 步骤三&#xff1a;运行 总结 注 释(comment) JDK和JRE的说明 关系说明图 2、 概念说明 JDKJREJAVA开发工具&#xff08;javac.exe java.exe、javaboc.exe&…

SpringIoc依赖查找-5

1. 依赖查找的今世前生: Spring IoC容器从Java标准中学到了什么? 单一类型依赖查找 JNDI - javax.naming.Context#lookup(javax.naming.Name) JavaBeans - java.beans.beancontext.BeanContext 集合类型依赖查找 java.beans.beancontext.BeanContext 集合查找方法 层…

基于android的移动学习平台(前端APP+后端Java和MySQL)

一、需求规格说明书 1&#xff0e;概述 1.1项目目的与目标, &#xff08;1&#xff09; 项目目的&#xff1a;设计并实现网络化的在线学习系统&#xff0c;对校内课程教学进行辅助&#xff0c;为学生和教师提供一个良好的互动平台&#xff0c;方便学生课后获取学习资源和进行交…

阿里云负载均衡SLB,HTTPS动态网站部署负载均衡,实现高并发流量分发

第一步购买服务器&#xff0c;测的话一般就用按量付费几毛钱一小时 我是用了三台&#xff0c;一台是常用的服务器&#xff0c;两台临时服务器进行部署项目 2&#xff1a;服务器购买完之后&#xff0c;开始安装项目运行环境&#xff0c;我是宝塔一键按键的&#xff0c;PHP7.1。…

新知实验室-基于腾讯云音视频TRTC的微信小程序实践

前言 腾讯会议是我们常用的一款线上会议软体&#xff0c;如果想要使用&#xff0c;我们需要下载软体使用&#xff0c;相比之下&#xff0c;基于腾讯云音视频的TRTC提供了一个很好的解决方案&#xff0c;我们通过接入到小程序中来实现快捷的开始会议&#xff0c;加入会议。 TR…

[Power Query] 删除错误/空值

数据导入后&#xff0c;有可能出现错误(Error)或者空值(null) &#xff0c;我们需要对此进行删除。为此&#xff0c;本文通过讲解Power Query中的删除错误/空值操作&#xff0c;帮助大家的同时也便于日后自身的复盘学习 数据源 将数据源导入到Power BI Desktop&#xff0c;单击…

VSCode中Prettier插件依赖安装及冲突解决

文章目录一、Prettier插件安装1.1 安装Prettier插件1.2 添加Prettier配置文件1.3 配置格式化工具1.4 配置自动格式化1.5 与ESLint冲突解决二、Prettier依赖安装2.1 安装依赖2.2 配置2.3 配置指令2.4 其他配置和冲突解决一、Prettier插件安装 1.1 安装Prettier插件 通过VSCode…

[附源码]计算机毕业设计JAVA流浪动物救助系统

[附源码]计算机毕业设计JAVA流浪动物救助系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybati…

【算法基础】(一)基础算法 --- 快速排序

✨个人主页&#xff1a;bit me ✨当前专栏&#xff1a;算法基础 &#x1f525;专栏简介&#xff1a;该专栏主要更新一些基础算法题&#xff0c;有参加蓝桥杯等算法题竞赛或者正在刷题的铁汁们可以关注一下&#xff0c;互相监督打卡学习 &#x1f339; &#x1f339; &#x1f3…

游戏品类加速回暖,文娱内容持续火热——2022年IAA行业品类发展洞察系列报告·第三期

易观分析&#xff1a;易观分析联合穿山甲与巨量算数共同构建IAA发展指数&#xff0c;通过行业规模、内容热度、商业变现的多维数据指标反映行业细分品类的发展情况&#xff0c;对领域季度运行情况、热门品类进行分析解读&#xff0c;助力开发者深入洞察领域特性和发展趋势&…

【American English】美语的连读规则

文章目录连读规则1. 辅音 元音2. 辅音 辅音情形1: 相同或相近的辅音相遇情形2: 辅音 h情形3: 爆破音 [l] / [m] / [n]情形4: 爆破音 [f] / [v]情形5: 爆破音 [tf]/[]3. 元音 元音情形1: 嘴唇变平时增加 [y] 音情形2: 嘴唇变圆时增加 [w] 音4. 特殊辅音 yRef连读规则 英…

open-set recognition(OSR)开集识别

开集识别 闭集识别 ​ 训练集中的类别和测试集中的类别是一致的&#xff0c;最常见的就是使用公开数据集进行训练&#xff0c;所有数据集中的图像的类别都是已知的&#xff0c;没有未知种类的图像。传统的机器学习的算法在这些任务上已经取得了比较好的效果。 &#xff08;训…

简述供应商管理SRM系统

简道云SRM管理系统供应商关系管理(SRM系统)&#xff0c;是企业可以用来对供应商的优势和能力进行系统的、全行业范围的评估&#xff0c;涉及企业整体的商业战略&#xff0c;供应商寻源、采购审批、比价、招投标管理、订单执行、库存可视化管理、财务支付审批对账、供应商绩效评…

java--并发

并发1.java的线程状态&#xff08;1&#xff09;sleep wait的区别和联系2.线程池的核心参数3.lock 和 synchronized4.volatile能否保证线程安全5.java中的悲观锁和乐观锁6.Hashtable和ConcurrentHashMap7.对ThreadLocal1.java的线程状态 new 新建&#xff1a;普通的类&#xf…

流媒体技术基础-流媒体服务与框架

一、开源流媒体服务器 38款 流媒体服务器开源软件 主流的开源流媒体服务器及框架如下&#xff1a; 1.Live555 [RTSP拉流] 一个为流媒体提供解决方案的跨平台的C开源项目&#xff0c;它实现了对标准流媒体传输协议如 RTP/RTCP、RTSP、SIP等的支持。 实现了对多种音视频编码格…