ThingsBoard源码解析-消息队列

news2025/7/12 15:29:30

概述

消息队列是thingsboard支持集群的基础,比如使用Kafka可实现消息在整个服务集群中共同处理,提高性能。如果是内存队列,则无法再服务实例间共享消息。

定义

在module【common/cluster-api】的org.thingsboard.server.queue包定义了消息队列,其中接口

TbQueueProducer定义了生产者,实现类有:

接口TbQueueConsumer定义了消费者,实现类有:

可以看到除了抽象类外,生产者和消费者的具体实现时一一对应的,应用在启用根据配置

queue.type确定哪种实现,默认是内存

使用

生产者

在module【common】的package【transport】中实现了各种不同的传输协议,无论哪种协议当接收到遥测数据时,都会通过抽象父类【DefaultTransportService】的process方法处理:

其中resolve tpi的方法,通过TB_RULE_ENGINT解析出topic是 tb_rule_engine

消费者

当配置consumer-per-partition=true时,则开启了分区模式,分区可均分给消费者,实现集群处理;如果为开启分区模式,则一个队列就是一个队列,每个服务都完全消费自己的队列。

分区数通过partitions参数配置,默认是10

无分区模式

消费者服务类【DefaultTbRuleEngineConsumerService】,其init方法是PostContruct方法

根据消息队列的配置加载所有队列的所有分区。

默认有Main队列,因此会加载所有队列的消费者并存入consumers列表;

而DefaultTbRuleEngineConsumerService的抽象父类【AbstractConsumerService】中定义了应用启动完成事件【ApplicationReadyEvent】监听方法,在应用就绪后被执行

因此服务启动后会自动执行DefaultTbRuleEngineConsumerService的lauchMainConsumers方法:

在线程池中异步执行consumerLoop方法

consumerLoop方法中consumer始终循环拉取数据,然后再消费,当consumer stop时会结束循环。

分区模式(默认)

TB可使用Zookeeper实现服务发现,默认情况下ZK是未开启的

当不开启zk时,是无法感知其他服务实例的,此时每个服务都会消费全部分区;

当开启zk后,能感知其他服务实例,才会真正的实现独立处理。

注意当开启zk后,就不能再使用内存型消息队列了,否则会导致消息虽然发送到了(内存)队列的某个分区,但是该分区应该被其他服务实例消费,因此该消息将永远不会被消费。

实现类是ZkDiscoveryService:

服务启动

服务启动完成后,想zk注册当前服务,使得其他服务实例可以发现新增服务实例事件:

均匀分配分区

通过取余将队列均匀的分配给所有服务节点;

例如全部服务节点有2个,节点ID分别是:s0,s1

当判断p1由谁消费时,1%2=1,则由s1消费

当判断p4由谁消费时,4%2=0,则由s0消费

新增服务实例事件处理

ZkDiscoveryService中定义了当收到zk child path变化事件时则回调recalculatePartitions方法进行重新分区:

先debug模式启动一个服务实例A,然后再另外启动一个服务B,服务A收到新服务启动的事件,接下来debug看一下分区是如何重新分配的,首先可以看到oldPartions中每个队列下的value都是10个,即10个分区:

而经过重新分配后,还属于当前服务消费的分区都变成了5个:

然后发送分区变化事件到消费者服务:

消费者服务(DefaultTbRuleEngineConsumerService)收到事件:

将要所有需要消费的分区,存储到临时队列subscribeQueue中

通过将当前消费分区有权消费分区比较,计算需要增加消费的分区:

当前消费分区是10个,而重新分配后有权消费的分区减半,只剩5个,因此不需要增加,所以计算后addedPartitions是空的:

通过将当前消费分区和有权消费分区比较,计算需要移除消费的分区,由于现在有权消费的只剩5个分区(另外5个被其他服务实例消费),因此计算后removedPartitions中有5个分区

然后对这个5个要移除消费的分区执行removeConsumerForTopicByTpi

因为消费者poll是在一个循环中不断执行的,当设置consumer stop时,则退出循环。

API请求异步处理

设备连接也是基于消息队列实现异步的,详情参考《设备连接debug解析 》

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/36592.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

排名预测系统

排名预测系统 题目链接 题目背景: 本题大模拟来自真实的需求,即:综合三场网络赛的名次,来预计一个正式队伍在所有正式参赛队伍中的名次以此来估计自己能不能拿牌。本来只有一道题,即为你们看到的T5,经过…

【Linux kernel/cpufreq】framework ----big Little driver

Linux kernel支持ARM bigLttile框架的解决方案 一般ARM SOC包含能效和性能两个cluster,共8个 core,可以把这8个core统统开放给kernel,让kernel的调度器(scheduler)根据系统的实际情况,决定哪些任务应该在哪…

C++ 值传递、引用传递、指针传递

一、简介 参数传递的三种方式&#xff1a;值传递、引用传递、指针传递 二、举例如下 #if 1 值传递 引用传递 指针传递的区别void value_input(int a){cout << "值传递------函数" <<&a <<endl;a 100;}void Pointer_input(int * n){cou…

云上办公兴起,华为云桌面Workspace更靠谱

云上办公兴起&#xff0c;华为云桌面Workspace更靠谱 为了办公的便利性&#xff0c;也趁着华为云推行“实惠更实用&#xff0c;11都如愿”的主题活动&#xff0c;许多企业果断入手了华为云桌面Workspace服务&#xff0c;当亲自试用后&#xff0c;才逐渐感受使用华为云桌面Work…

FPGA+ARM异核架构,基于米尔MYC-JX8MMA7核心板的全自动血细胞分析仪

全自动血细胞分析仪是医院临床检验应用非常广泛的仪器之一&#xff0c;用来检测红细胞、血红蛋白、白细胞、血小板等项目。是基于电子技术和自动化技术的全自动智能设备&#xff0c;功能齐全&#xff0c;操作简单&#xff0c;依托相关计算机系统在数据处理和数据分析等方面具有…

脚气、灰指甲治疗实验方案

脚气 &#xff08;已临床实验&#xff09; 脚气&#xff0c;又叫足廯、香港脚。 糜烂性脚气 症状&#xff1a;80%都是这种类型。常见于多汗人群。角质层被汗水浸软&#xff0c;发白了以后&#xff0c;走动不断摩擦表皮脱落&#xff0c;露出鲜红色糜烂面&#xff0c;瘙痒剧烈&…

什么是分布式软件系统

:什么是分布式软件系统&#xff1f;分布式软件系统是什么意思&#xff1f; 分布式软件系统(Distributed Software Systems)是支持分布式处理的软件系统,是在由通信网络互联的多处理机体系结构上执行任务的系统。它包括分布式操作系统、分布式程序设计语言及其编译(解释)系统、分…

[附源码]java毕业设计疫情状况下生活物资集体团购系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[毕业设计]大数据电影数据分析可视化

目录 前言 课题背景和意义 实现技术思路 网页分析 索引页 详情页 反爬破解 实现效果图样例 前言 &#x1f4c5;大四是整个大学期间最忙碌的时光,一边要忙着备考或实习为毕业后面临的就业升学做准备,一边要为毕业设计耗费大量精力。近几年各个学校要求的毕设项目越来越难,…

Unity UI 框架相关的一些思考

开源地址&#xff1a; GitHub - NRatel/NRFramework.UI: 基于 Unity UGUI 的 UI 开发框架基于 Unity UGUI 的 UI 开发框架. Contribute to NRatel/NRFramework.UI development by creating an account on GitHub.https://github.com/NRatel/NRFramework.UI 简介&#xff1a;…

EMR-StarRocks 与 Flink 在汇量实时写入场景的最佳实践

作者&#xff1a; 刘腾飞 汇量后端开发工程师 阿里云开源OLAP研发团队 EMR-StarRocks介绍 阿里云EMR在年初推出了StarRocks服务&#xff0c;StarRocks是新一代极速全场景MPP&#xff08;Massively Parallel Processing&#xff09;数据仓库&#xff0c;致力于构建极速和统一分…

帝国cms后台登录系统限制次数,60分钟过后重新登录解决办法

帝国cms后台登录系统一不小心登录频繁就提示: 系统限制的登录次数不得超过5次,请等60分钟过后,方可重新登录 主要原因就是频繁的输错用户名或者密码导致登录受限 解帝国cms后台登录系统限制次数方法一:等待60分钟,然后再尝试登录 解帝国cms后台登录系统限制次数方法二:修改…

Hive之DQL操作

Hive系列第六章 &#xff08;实际是第七篇&#xff0c;就不改目录序号了&#xff0c;大家知道就行&#xff0c;后续的篇章类推即可&#xff09; 第六章 DQL查询数据 DDL&#xff1a; Data Definition Language 数据定义语言 DML&#xff1a; Data Manipulation Language …

【科学文献计量】GC.networkCoInvestigator()和GC.networkCoInvestigator()中的参数解释

@TOC 1 数据 使用官网提供的基金数据导入到python环境中 2 GC.networkCoInvestigator()中的参数解释 GC.networkCoInvestigator()中的参数解释: targetTagsL: [list]数据类型。默认为None,可以指定为Grant中研究者的标签构成的列表,很多基金中作者没有已知的标签,需要自…

最新版本EasyRecovery15个人免费版电脑数据恢复工具

最新版本EasyRecovery15是一款是款恢复率高、速度快的数据恢复软件&#xff0c;Ontrack EasyRecovery (易恢复) 跨平台支持 Windows 以及 Mac 系统&#xff0c;能能够顺利找回因各种原因丢失的文件&#xff0c;比如文件误删除、误格式化、分区丢失等&#xff0c;且EasyRecovery…

一种获得离散型周期数据的变化周期的算法

400个数据像这样&#xff1a; 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 109 152 155 155 237 24 27 27 …

【Android 开发】 面试官刨根问底?教你如何避免翻车沟通表达能力

很久以前&#xff0c;凭借四大组件、Java基础等知识&#xff0c;便可开开心心的开发&#xff0c;轻松的上岗&#xff1b; 而随着Android的不断发展完善&#xff0c;各种组件库越来越成熟&#xff0c;学习资料越来越多&#xff0c;我们却慢慢的看不到了方向&#xff1b;信息爆炸…

Servlet(Cookie和Session)

目录 &#x1f432; 1. Cookie 的工作流程 &#x1f432; 2. Servlet中操作 Cookie 和 Session 的api &#x1f432; 3. 案例1: 模拟登录 &#x1f432; 4. 上传文件 &#x1f432; 5. 案例2: 上传文件 &#x1f432; 1. Cookie 的工作流程 Cookie 是浏览器在本地持久化保…

[附源码]SSM计算机毕业设计朋辈帮扶系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

[毕业设计]opencv机器学习双目测距精度测量系统

前言 &#x1f4c5;大四是整个大学期间最忙碌的时光,一边要忙着备考或实习为毕业后面临的就业升学做准备,一边要为毕业设计耗费大量精力。近几年各个学校要求的毕设项目越来越难,有不少课题是研究生级别难度的,对本科同学来说是充满挑战。为帮助大家顺利通过和节省时间与精力投…