描述
规则引擎是Thingsboard的核心部分,基于Actor编程模型,类似事件驱动;
每个actor都有自己的消息队列(mailBox)保存接收到的消息
actor可以创建actor
actor可以将消息转发给其他actor
分析
Actor模型实现
系统中与Actor模型相关类都在工程common/actor下,几个核心类说明如下:
- TbActorSystem Actor系统接口,Actor系统类实现该接口
- TbActor Actor接口,所有Actor需直接或间接实现该接口
- TbActorCreator Actor创建接口,所有Actor创建器直接或间接实现该接口
- TbActorRef Actor句柄接口,使用TbActorCreator创建Actor后返回此句柄,通常指向Actor的邮箱。
- TbActorCtx Actor上下文接口,继承TbActorRef接口。
- AbstractTbActor 抽象Actor,实现TbActor接口。
- DefaultTbActorSystem Actor系统,用于Dispatcher的创建删除、Actor的创建查找和Actor之间消息传递等。
- Dispatcher 调度器,用于调度Actor的创建或消息分发。
- TbActorMailbox 邮箱,实现TbActorCtx接口,指向某个actor,同时存储消息到队列并使用调度器处理队列中的消息。
类继承关系图如下:
TbActorMailBox中有两个队列用于存储待处理的消息:
两个队列的类型是ConcurrentLinkedQueue,非阻塞并发队列,减少了线程切换,性能好。
基于以上类,实现一个Actor代码如下:
Plain Text public class TbMyActor extends AbstractTbActor { public TbMyActor() { }
@Override public boolean process(TbActorMsg msg) { //process some message return false; } /** * use this to create Actor */ public static class ActorCreator implements TbActorCreator { public ActorCreator() { } @Override public TbActorId createActorId() { return new TbEntityActorId(new TenantId(EntityId.NULL_UUID)); } @Override public TbActor createActor() { return new TbMyActor(); } } } |
类继承关系如下:
引擎初始化
回到正题,DefaultActorService构造一个使用Actor模型系统的规则引擎,分为两个阶段:
阶段1:类初始化,方法:initActorSystem
Plain Text //DefaultActorService 84 log.info("Initializing actor system."); actorContext.setActorService(this); TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts); //新建DefaultTbActorSystem对象 system = new DefaultTbActorSystem(settings); //创建线程池用于后续异步处理消息 system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize)); system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize)); system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize)); system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize)); actorContext.setActorSystem(system); //创建整个Actor模型的根 appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext)); actorContext.setAppActor(appActor); //创建状态Actor,也是一个根,用于统计状态 TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor")); actorContext.setStatsActor(statsActor); log.info("Actor system initialized."); |
默认会将全部租户下的全部规则节点加载到内存中;可通过配置【TB_SERVICE_TENANT_ID】 指定服务专用租户,即只会加载该租户下的规则节点
阶段2:应用准备完成,方法为onApplicationEvent
Plain Text //DefaultActorService 120 log.info("Received application ready event. Sending application init message to actor system"); //给顶层AppActor邮箱发送消息AppInitMsg appActor.tellWithHighPriority(new AppInitMsg()); |
AppActor收到消息后,在doProcess方法中进行处理
Plain Text //AppActor 67 if (!ruleChainsInitialized) { //初始化多个租户Actors initTenantActors(); ruleChainsInitialized = true; if (msg.getMsgType() != MsgType.APP_INIT_MSG) { log.warn("Rule Chains initialized by unexpected message: {}", msg); } } |
TenantActor在init阶段进行租户下规则链RuleChainActor创建
Plain Text // TenantActor 88 if (isRuleEngineForCurrentTenant) { try { if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenantProfile.isIsolatedTbRuleEngine())) { if (apiUsageState.isReExecEnabled()) { log.info("[{}] Going to init rule chains", tenantId); //规则链节点初始化 initRuleChains(); } else { log.info("[{}] Skip init of the rule chains due to API limits", tenantId); } } else { isRuleEngineForCurrentTenant = false; } } catch (Exception e) { cantFindTenant = true; } } |
RuleChainActor在init阶段,创建RuleChainActorMessageProcessor并调用其start,进行规则节点RuleNodeActor的创建
Plain Text //RuleChainActorMessageProcessor 100 if (!started) { RuleChain ruleChain = service.findRuleChainById(tenantId, entityId); if (ruleChain != null) { List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId); log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size()); // Creating and starting the actors; for (RuleNode ruleNode : ruleNodeList) { log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode); //创建规则节点 TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode); //加入到节点集合中 nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode)); } //初始化节点路由 initRoutes(ruleChain, ruleNodeList); started = true; } } else { onUpdate(context); } |
RuleNodeActor在init阶段,创建RuleNodeActorMessageProcessor并调用其start,进行TbNode的创建
Plain Text //RuleNodeActorMessageProcessor 62 //根据类型创建TbNode实例 tbNode = initComponent(ruleNode); if (tbNode != null) { state = ComponentLifecycleState.ACTIVE; } |
当与设备相关消息开始上传时,TenantActor还会初始化DeviceActor
Plain Text //TenantActor 153 case TRANSPORT_TO_DEVICE_ACTOR_MSG: //传递消息给DeviceActor,如果没有则创建 onToDeviceActorMsg((DeviceAwareMsg) msg, false); break; |
形成的结构如下:
消息传输
完成规则初始化后,规则引擎接受消息传输,以普通设备上传时序数据并存储为例,规则引擎处理的核心处理流程如下:
Plain Text //ActorSystemContext 561 //传递消息到AppActor邮箱中 appActor.tell(tbActorMsg); //AppActor 84 //根据消息类型转换消息为QueueToRuleEngineMsg,调用onQueueToRuleEngineMsg方法 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); //AppActor 140 //创建或获取租户Actor邮箱,并传递消息 getOrCreateTenantActor(msg.getTenantId()).tell(msg); //TenantActor 150 //根据消息类型转换消息为QueueToRuleEngineMsg,调用onQueueToRuleEngineMsg方法 onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); //TenantActor 185 //获取根规则链Actor邮箱,并传递消息 getRootChainActor().tell(msg); //RuleChainActor 55 //根据消息类型转换消息为QueueToRuleEngineMsg,使用处理器RuleChainActorMessageProcessor处理消息 processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); //RuleChainActorMessageProcessor 215 //如果消息中未指定规则节点,targetCtx为第一个节点邮箱,否则为指定节点邮箱 pushMsgToNode(targetCtx, msg, ""); //RuleChainActorMessageProcessor 338 //新建RuleChainToRuleNodeMsg消息,并向规则节点邮箱发送消息 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType)); //RuleNodeActor 60 //根据消息类型转换消息为RuleChainToRuleNodeMsg,调用onRuleChainToRuleNodeMsg处理该消息 onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg); //RuleNodeActor 94 //使用处理器RuleNodeActorMessageProcessor处理消息 //processor.onRuleChainToRuleNodeMsg(msg); //RuleNodeActorMessageProcessor 136 //调用规则节点实例处理消息 tbNode.onMsg(msg.getCtx(), msg.getMsg()); //TbDeviceProfileNode -> TbMsgTypeSwitchNode 消息处理流程 //TbDeviceProfileNode 135 //获取或创建设备状态DeviceState,处理消息 deviceState.process(ctx, msg); //DeviceState 140 //处理遥测数据 //stateChanged = processTelemetry(ctx, msg); //DeviceState 260 //调用上下文tellSuccess方法处理消息 //ctx.tellSuccess(msg); //DefaultTbContext 103 //调用tellNext处理关联关系为SUCCESS的消息。 tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null); //DefaultTbContext 121 //新建消息RuleNodeToRuleChainTellNextMsg,并传递给所在规则链Actor邮箱。 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); //RuleChainActor 58 //根据消息类型转换消息为RuleNodeToRuleChainTellNextMsg,使用处理器RuleChainActorMessageProcessor处理消息 processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); //RuleChainActorMessageProcessor 252 //根据消息来源编号originatorNodeId(一般是上一个节点的Id)和关联类型过滤关联联系 List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream() .filter(r -> contains(relationTypes, r.getType())) .collect(Collectors.toList()); //RuleChainActorMessageProcessor 282 //如果关联关系为1,调用pushToTarget到关联目标实体(一般是下一个节点,也有可能是下一个规则链) pushToTarget(tpi, msg, relation.getOut(), relation.getType()); //RuleChainActorMessageProcessor 304 //获取关联目标实体的Actor邮箱,调用pushMsgToNode方法处理消息 pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType); //RuleChainActorMessageProcessor 338 //新建消息RuleChainToRuleNodeMsg,向目标实体Actor邮箱发送该消息 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType)); //TbMsgTypeSwitchNode 101 //计算relationType(这里是`Post telemetry`),调用上下文tellNext处理消息 ctx.tellNext(msg, relationType); //TbMsgTypeSwitchNode-> TbMsgTimeseriesNode 消息处理流程类似 |
核心流程时序图(省掉一些非核心时序)如下:
为了防止蒙圈,提供一张核心流程示意图:
规则链加载
应用启动时,从DefaultActorService的PostConstruct的initActorSystem方法开始:
通过类DefaultActorSystem的createActor方法创建AppActor,该方法时创建Actor的实现,其他Actor也是通过该方法创建的。创建actor之后会执行actor中的initActor方法
AppActor的init方法:
定时向AppActor的mail box中发送一条消息;
而tryProcessQueue方法最终会执行actor的process方法
在创建租户Actor后,也执行TanentActor的init方法:
同样创建RuleChainActor后也会执行init方法:
进入processor.start方法:
然后查看RuleNodeActor的init方法,发现和RuleChainActor是一样的,定义在抽象类ComponentActor中,顺着查看到RuleNodeActorMessageProcessor的start方法:
Actor System
服务启用后,只有一个AppActor,但会包含所有的TalentActor;一个TalentActor有且只有一条RootRuleChainActor,同时可以有多条RuleChainActor;每个RuleChainActor可以包含多个RuleNodeActor和RuleChainActor
Actor执行
Actor的执行逻辑定义在process方法中:
通过TbActorRef类的tell方法将消息传递给Actor执行:
TIPS
- Actor模型 wiki
- 需要注意,在微服务架构下,Core服务和RuleEngine服务都拥有自己的Actor模型实现,Core包含AppActor、TenantActor以及DeviceActor,RuleEngine包含AppActor、TenantActor 、RuleEngine以及RuleNodeActor。
一条遥测数据的旅程
在TB租户下默认有一条【根规则链】,其中定义了默认情况下数据的处理流程,遥测数据也不例外,无论遥测数据是通过哪种协议还是通过API,都会进入到根规则链,默认流程如下:
遥测数据以此通过每个规则节点的处理,解析来我们结合代码深入了解一下。
根规则链的配置文件是:root_rule_chain.json。
提前设置告警规则,通过MQTTX模拟上报设备遥测数据。
物模型过滤(Model Filter Node)
JSON { "additionalInfo": { "description": "根据设备的物模型对报文进行测点/属性过滤", "layoutX": 310, "layoutY": 200 }, "type": "org.thingsboard.rule.engine.transform.TbTransformAndModelFilterMsgNode", "name": "Model filter node", "debugMode": false, "configuration": { } } |
在根规则链配置文件中可以看到【物模型过滤】规则节点的实现类【TbTransformAndModelFilterMsgNode】,该类集成【TbAbstractTransformNode】,属于转换类节点
规则节点的入口方法是onMsg:
TypeScript @Override public void onMsg(TbContext ctx, TbMsg msg) { boolean ifMsgTypeFilter = msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())||msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()); if(!(ifMsgTypeFilter && EntityType.DEVICE.name().equals(msg.getOriginator().getEntityType().name()))){ ctx.tellSuccess(msg); return; } withCallback(transform(ctx, msg), m -> transformSuccess(ctx, msg, m), t -> transformFailure(ctx, msg, t), ctx.getDbCallbackExecutor()); } |
如上图示,消息进入onMsg,可以看到消息类型是:POST_TELEMETRY_REQUEST
onMsy主要调用了transform方法:
TypeScript @Override protected ListenableFuture<List<TbMsg>> transform(TbContext ctx, TbMsg msg) { //ctx.logJsEvalRequest(); //return jsEngine.executeUpdateAsync(msg); return Futures.immediateFuture(Arrays.asList(msg)); } |
并没有任何转换,其中被注释的copy自【TbTransformMsgNode】消息脚本转换节点
所以这个转换器目前没有开发完,也可能是测试着玩的,先忽略。
设备配置文件(DeviceProviceNode)-生成告警
规则节点实现类是org.thingsboard.rule.engine.profile.TbDeviceProfileNode
作用是根据设备配置完成特殊处理,比如给设备定义了报警策略,则该节点则会校验报警条件,如果满足则生成告警。
消息进入onMsg方法后,根据消息类型不同执行不同的处理,而遥测数据将用于更新设备状态(DeviceState)
进入deviceState.process方法,然后根据消息类型执行processTelemetry方法
processTelemetry方法中,会遍历设备配置(deviceProfile)中的告警规则(alarmSetting)
进入alarmState.process方法
可以看到执行createOrClearAlarms方法,即生成或者清楚告警
由于我提前设置了规则:当亮度(brightness)大于50时则告警,而当前遥测数据中brightness=100,因此evalResult的结果是TRUE
然后继续执行calculateAlarmResult方法,该方法中会调用alarmService保存到数据库中alarm表
消息类型路由器(Message Type Switch)
规则节点实现类是:org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode
从根规则链图中可以看到不同的消息类型和relationType匹配,会路由到不同的规则节点,有5个对应的规则节点:
然后执行 DefaultTbContext#tellNext 方法,然后构造RuleNodeToRuleChainTellNextMsg消息并传入ChainActor
这里的ChainActor的类就是RuleChainActor,进入doProcess方法
进入该方法后,会查询到后面关联的节点有5个:
过滤出目标的关联节点,并转发到保存时序数据的规则节点
保存时序数据(Save Timeseries)
规则节点实现类是:org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode
消息进入onMsg方法,最终调用时序数据库Service保存数据
保存时序数据过程中,还会触发websocket更新,详情参考:TingsBoard源码解析-WebSocket_imagine0623的博客-CSDN博客