Enrichment nodes利用数据可给节点增加(丰富)元数据信息?这些节点可以丰富消息的属性、最新的时间序列值、历史时间序列数据以及从各种来源包括消息发送者、相关实体、当前租户或客户获取的实体详细信息。Transformation nodes修改现有消息或创建新消息Transformation nodes are the data processing and manipulation(操纵) components of ThingsBoard’s rule engine that modify the content, structure, or format of incoming messages.Action nodeshttps://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/action/Perform various actions based on configuration and message根据配置和消息执行各种操作创建警报 — 为消息发送者创建新警报或更新现有的活动警报。清除警报 — 清除消息发送者现有的活动警报。计算字段 — 触发时间序列或属性数据的计算字段处理而不将原始数据持久化到数据库中。删除关系 — 根据类型和方向从所选实体到消息发送者的关系中删除关系。External nodeshttps://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/External nodes are the integration components of ThingsBoard’s rule engine that send messages to third-party services and external systems.AI 请求 — 向大型语言模型发送带有可定制提示和可选文件附件的请求将 AI 生成的响应作为传出消息数据返回。MQTT — 使用 QoS 1至少一次将传入的消息数据发布到外部 MQTT 代理支持动态主题模式、多种身份验证方法和 TLS/SSL 加密。Flow nodeshttps://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/flow/流程节点控制消息如何在规则引擎的不同部分之间移动。 这些节点可以在队列之间传输消息以实现顺序处理或分离不同的工作负载将消息转发到其他规则链并将结果返回给调用规则链。规则链 — 将传入的消息转发到指定的规则链。检查点 — 将传入的消息转移到指定的队列。在规则链上下文中拿设备属性使用Enrichment - originator attributes节点, 它会往 Metadata 添加设备属性, 注意key有前缀的服务端属性是:ss_规则链处理源码分析关键知识Actor 模型首先在 ThingsBoard 中这套自研框架遵循一个叫 Actor 模型概念在计算机科学中演员模型英语Actor model是一种并发运算上的模型。“演员”是一种程序上的抽象概念被视为并发运算的基本单元当一个演员接收到一则消息它可以做出一些决策、建立更多的演员、发送更多的消息、决定要如何回答接下来的消息。演员可以修改它们自己的私有状态但是只能通过消息间接的相互影响避免了基于锁的同步。演员模型推崇的哲学是“一切皆是演员”这与面向对象编程的“一切皆是对象”类似。演员是一个运算实体响应接收到的消息相互间是并发的发送有限数量的消息给其他演员创建有限数量的新演员指定接收到下一个消息时要用到的行为。以上动作不含有顺序执行的假设因此可以并行进行。https://zh.wikipedia.org/zh-cn/演员模型Actor 的三大概念演员Actor演员是模型中的基本计算实体每个演员都是一个独立、自主的单元具备以下能力封装私有状态每个演员维护自己的内部状态其他演员无法直接访问。接收消息通过邮箱Mailbox接收来自其他演员的消息。发送消息可以向其他演员异步发送有限数量的消息。创建新演员可以动态创建有限数量的新演员。改变行为在接收到消息后可以决定如何响应下一条消息即改变自身的行为。上述操作没有预设的执行顺序可以并行进行。消息传递Message Passing消息是演员之间通信的唯一方式具有以下特征异步性发送方无需等待接收方处理消息发送者与通信完全解耦。不可变性消息通常被封装为不可变的数据结构确保发送方和接收方不会共享可变状态。地址识别消息接收者通过邮件地址进行识别演员只能与它拥有地址的演员通信。邮箱Mailbox每个演员都有一个邮箱用于存储接收到的消息。演员按顺序从邮箱中取出消息进行处理这确保了状态变化以受控的方式进行同时也管理了消息传递的异步性。ThingsBoard 中的 Actor 模型Actor 包含三大核心组成部分状态State、行为Behavior和邮箱Mailbox。它将系统中的所有功能模块抽象为独立的 “Actor”(演员)单元。每个 Actor 内部维护自己的变量信息状态且状态仅由 Actor 自身管理。Actor 之间不共享内存完全通过消息传递进行通信。Actor 之间的通信桥梁是邮箱Mailbox。邮箱内部采用 FIFO先进先出的消息队列来存储消息。发送方 Actor 通过tell方法将消息异步投递到接收方 Actor 的邮箱中接收方则按照顺序从邮箱中获取消息并触发相应的计算逻辑行为来改变自身状态。ThingsBoard 的 Actor 按照严格的层级结构进行组织。根节点是App Actor它负责管理下属的所有租户 ActorTenant Actor进而管理该租户下的设备 Actor 和规则链 Actor。[[N_ThingsBoard 业务源码分析]]对于规则链消息是从集群订阅而来的, 调用栈:org.thingsboard.server.service.queue.DefaultTbCoreConsumerService#forwardToDeviceActororg.thingsboard.server.actors.ActorSystemContext#tellorg.thingsboard.server.actors.TbActorMailbox#tell(org.thingsboard.server.common.msg.TbActorMsg)org.thingsboard.server.actors.TbActorMailbox#enqueueorg.thingsboard.server.actors.TbActorMailbox#tryProcessQueueorg.thingsboard.server.actors.TbActorMailbox#processMailboxorg.thingsboard.server.actors.app.AppActor#doProcess最终到该方法org.thingsboard.server.actors.ruleChain.RuleChainActor#doProcessOverride protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); break; case QUEUE_TO_RULE_ENGINE_MSG: // 队列到规则链 processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:// 规则链 到 规则链? processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); break; case RULE_CHAIN_TO_RULE_CHAIN_MSG: processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg); break; case RULE_CHAIN_INPUT_MSG: processor.onRuleChainInputMsg((RuleChainInputMsg) msg); break; case RULE_CHAIN_OUTPUT_MSG: processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg); break; case PARTITION_CHANGE_MSG: processor.onPartitionChangeMsg((PartitionChangeMsg) msg); break; case STATS_PERSIST_TICK_MSG: onStatsPersistTick(id); break; default: return false; } return true; }org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#onQueueToRuleEngineMsg/** * 接收来自队列的消息入口由 RuleChainActor.doProcess 分发 * 区分两种情况 * 1. relationTypes 为空 → 全新消息从 firstNode 开始执行 * 2. relationTypes 非空 → 重试/恢复消息从消息携带的 ruleNodeId 位置继续执行 */ void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) { TbMsg msg envelope.getMsg(); // 校验消息是否已过期callback 被取消 if (!checkMsgValid(msg)) { return; } log.trace([{}][{}] Processing message [{}]: {}, entityId, firstId, msg.getId(), msg); if (envelope.getRelationTypes() null || envelope.getRelationTypes().isEmpty()) { // 首次进入规则链useRuleNodeIdFromMsgtrue但 msg.getRuleNodeId()null → 路由到 firstNode onTellNext(msg, true); } else { // 消息在某节点处理后通过 tellNext 重入或从队列重试恢复携带目标 nodeId 和 relationType onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage()); } }首次入口/** * 将消息路由到具体 RuleNode * param useRuleNodeIdFromMsg true从 msg.getRuleNodeId() 查找目标节点null 时使用 firstNode */ private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) { try { // 检查规则链组件是否处于 ACTIVE 状态否则抛出 RuleNodeException checkComponentStateActive(msg); RuleNodeId targetId useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null; RuleNodeCtx targetCtx; if (targetId null) { // 没有指定目标节点 → 从入口节点 firstNode 开始 // 同时重置 msg 上的 ruleNodeId绑定当前 ruleChainId targetCtx firstNode;//这个是 org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode msg msg.copy() .ruleChainId(entityId) .resetRuleNodeId() .build(); } else { // 已有目标节点 ID如 tellNext 后恢复执行 targetCtx nodeActors.get(targetId); } if (targetCtx ! null) { log.trace([{}][{}] Pushing message to target rule node, entityId, targetId); // 构建 DefaultTbContext 并 tell RuleChainToRuleNodeMsg 到目标 RuleNodeActor pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE); } else { // 目标节点不存在可能已被删除直接 ack 成功不阻塞批处理 log.trace([{}][{}] Rule node does not exist. Probably old message, entityId, targetId); msg.getCallback().onSuccess(); } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { msg.getCallback().onFailure(new RuleEngineException(e.getMessage(), e)); } }节点重入org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#onTellNext(org.thingsboard.server.common.msg.TbMsg, org.thingsboard.server.common.data.id.RuleNodeId, java.util.Setjava.lang.String, java.lang.String)/** * 核心路由逻辑根据 originatorNodeId 查找 nodeRoutes按 relationType 过滤出下游节点列表 * 三种分支 * 0 个下游 → 终止节点回调 onSuccess 或 onFailure流程结束 * 1 个下游 → 直接 pushToTarget同分区内存投递 或 跨分区入队 * N 个下游 → 用 MultipleTbQueueTbMsgCallbackWrapper 包装对每个下游副本独立回调 */ private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, SetString relationTypes, String failureMessage) { try { checkComponentStateActive(msg); EntityId entityId msg.getOriginator(); // 解析消息归属的分区用于判断是否需要跨分区入队 TopicPartitionInfo tpi systemContext.resolve(tenantId, entityId, msg); ListRuleNodeRelation ruleNodeRelations nodeRoutes.get(originatorNodeId); if (ruleNodeRelations null) { log.warn([{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message., tenantId, entityId, msg.getId()); ruleNodeRelations Collections.emptyList(); } // 按 relationType 过滤出匹配的下游连线大小写不敏感 ListRuleNodeRelation relationsByTypes ruleNodeRelations.stream() .filter(r - contains(relationTypes, r.getType())) .collect(Collectors.toList()); int relationsCount relationsByTypes.size(); if (relationsCount 0) {// 0 个下游 // 当前节点是叶子节点无匹配下游 log.trace([{}][{}][{}] No outbound relations to process, tenantId, entityId, msg.getId()); if (relationTypes.contains(TbNodeConnectionType.FAILURE)) { // 以 Failure relationType 到达叶子节点 → 整条消息处理失败 // → callback.onFailure() → packCtx.onFailure() → pendingCount-- RuleNodeCtx ruleNodeCtx nodeActors.get(originatorNodeId); if (ruleNodeCtx ! null) { msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf())); } else { log.debug([{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info, entityId, originatorNodeId.getId()); msg.getCallback().onFailure(new RuleEngineException(Failure during message processing by Rule Node [ originatorNodeId.getId().toString() ])); } } else { // 以 Success/其他 relationType 到达叶子节点 → 消息正常结束 // → callback.onSuccess() → packCtx.onSuccess() → pendingCount-- // → 若 pendingCount0 则 latch.countDown()processMsgs await() 解除阻塞 msg.getCallback().onSuccess(); } } else if (relationsCount 1) {// 1 个下游 for (RuleNodeRelation relation : relationsByTypes) { log.trace([{}][{}][{}] Pushing message to single target: [{}], tenantId, entityId, msg.getId(), relation.getOut()); // 单下游直接路由callback 不复制 pushToTarget(tpi, msg, relation.getOut(), relation.getType()); } } else {//N 个下游 // 多下游如同时连 Success→A 和 Success→B // 用 MultipleTbQueueTbMsgCallbackWrapper 包装原 callback需要全部副本都完成才触发原 callback MultipleTbQueueTbMsgCallbackWrapper callbackWrapper new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); log.trace([{}][{}][{}] Pushing message to multiple targets: [{}], tenantId, entityId, msg.getId(), relationsByTypes); for (RuleNodeRelation relation : relationsByTypes) { EntityId target relation.getOut(); // 多下游场景必须走队列为每个分支复制 msg 新 UUID避免并发修改同一对象 putToQueue(tpi, msg, callbackWrapper, target); } } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { log.warn([ tenantId ] [ entityId ] [ msg.getId() ] onTellNext failure, e); msg.getCallback().onFailure(new RuleEngineException(onTellNext - e.getMessage(), e)); } }一个下游org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#pushToTarget/** * 单下游路由分发 * 同分区直接内存投递pushMsgToNode / RuleChainToRuleChainMsg零序列化开销 * 跨分区序列化写队列由目标分区的 consumerLoop 消费后继续执行 */ private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) { if (tpi.isMyPartition()) { switch (target.getEntityType()) { case RULE_NODE: // 同分区内直接 tell → RuleNodeActor.onRuleChainToRuleNodeMsg() pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType); break; case RULE_CHAIN: // 跳转到其他规则链tell TenantActor → 目标 RuleChainActor.onRuleChainToRuleChainMsg() parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType)); break; } } else { // 目标节点在其他分区集群模式写入队列等待目标分区消费者处理 putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target); } }多个下游org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#putToQueue(org.thingsboard.server.common.msg.queue.TopicPartitionInfo, org.thingsboard.server.common.msg.TbMsg, org.thingsboard.server.queue.TbQueueCallback, org.thingsboard.server.common.data.id.EntityId)/** * 多下游场景为目标节点复制消息新 UUID序列化后写入队列 * 下游是 RULE_NODE → 记录目标 ruleNodeId消费后直接从该节点恢复执行 * 下游是 RULE_CHAIN → 记录目标 ruleChainId消费后从目标链 firstNode 开始 */ private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) { switch (target.getEntityType()) { case RULE_NODE: putToQueue(tpi, msg.copy() .id(UUID.randomUUID()) .ruleChainId(entityId) .ruleNodeId(new RuleNodeId(target.getId())) .build(), callbackWrapper); break; case RULE_CHAIN: putToQueue(tpi, msg.copy() .id(UUID.randomUUID()) .ruleChainId(new RuleChainId(target.getId())) .resetRuleNodeId() .build(), callbackWrapper); break; } }org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#putToQueue(org.thingsboard.server.common.msg.queue.TopicPartitionInfo, org.thingsboard.server.common.msg.TbMsg, org.thingsboard.server.queue.TbQueueCallback)/** * 将消息序列化为 ToRuleEngineMsg proto通过 clusterService 写入目标 topic/partition * 对应消费端TbRuleEngineQueueConsumerManager.processMsgs() → submitMessage() */ private void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) { ToRuleEngineMsg toQueueMsg ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsgProto(TbMsg.toProto(newMsg)) .build(); clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper); }对于路由过来的规则链/** * 规则链路由到本节点的消息入口主处理路径 * 由 RuleChainActorMessageProcessor.pushToTarget() 通过 Actor tell 触发 * * 流程 * 1. 单例节点分区检查若本节点配置了 singletonMode 且当前服务不是该分区的负责者 * → 调用 putToNodePartition() 将消息序列化后重新入队路由到正确的服务实例 * 2. 若在本节点所在分区 * a. callback.onProcessingStart(info)记录当前节点为最后访问节点供超时日志使用 * b. checkComponentStateActive节点未激活初始化失败/已停止时直接 callback.onFailure * c. 统计本条消息已经过的 RuleNode 数量超过租户配置上限时直接 callback.onFailure * d. tbNode.onMsg(ctx, tbMsg)调用具体规则节点实现执行业务逻辑 * 节点实现完成后通过 ctx.tellSuccess / ctx.tellNext / ctx.tellFailure / ctx.ack 输出结果 * e. 若 tbNode.onMsg() 抛出异常调用 ctx.tellFailure() 向 FAILURE 链路由 */ void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { if (!isMyNodePartition()) { // 单例模式singletonModetrue且本实例不是负责分区 → 转发到正确分区 putToNodePartition(msg.getMsg()); } else { // 记录最后访问节点信息用于超时/失败时的日志诊断 msg.getMsg().getCallback().onProcessingStart(info); // 节点未就绪时立即失败不调用 tbNode checkComponentStateActive(msg.getMsg()); TbMsg tbMsg msg.getMsg(); // 累加节点执行计数防止消息在规则链中无限循环 int ruleNodeCount tbMsg.getAndIncrementRuleNodeCounter(); var tenantProfileConfiguration getTenantProfileConfiguration(); int maxRuleNodeExecutionsPerMessage tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage(); if (maxRuleNodeExecutionsPerMessage 0 || ruleNodeCount maxRuleNodeExecutionsPerMessage) { // 上报 API 用量统计用于配额控制 apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); // Debug 模式下持久化输入事件供规则链调试界面展示 persistDebugInputIfAllowed(msg.getMsg(), msg.getFromRelationType()); try { // 核心调用执行具体规则节点逻辑如 Save Timeseries / Filter / Transform 等 // 节点内部通过 ctx.tellSuccess/tellNext/tellFailure/ack 将处理结果回传给规则链 tbNode.onMsg(msg.getCtx(), msg.getMsg()); } catch (Exception e) { // 节点执行异常 → 路由到 FAILURE 链最终可能触发 callback.onFailure msg.getCtx().tellFailure(msg.getMsg(), e); } } else { // 超过最大节点执行次数直接标记消息失败防止死循环 tbMsg.getCallback().onFailure(new RuleNodeException(Message is processed by more than maxRuleNodeExecutionsPerMessage rule nodes!, ruleChainName, ruleNode)); } } }是别的分区节点处理的 转发到正确分区org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessor#putToNodePartition/** * 将消息路由到正确服务实例上的节点分区单例节点跨服务转发 * * 仅在 singletonModetrue 且当前服务不负责该节点分区时触发。 * 典型场景集群部署中某个规则节点设为全局单例只在特定服务实例运行 * 其他实例收到消息时需将消息转发过去。 * 流程 * 1. 拷贝消息并将 ruleNodeId 设为本节点 ID确保目标服务路由到正确节点 * 2. 解析节点 ID 对应的 TPI目标分区该分区属于负责运行此单例节点的服务实例 * 3. 序列化并推送到 Rule Engine 队列目标实例消费后再次进入 onRuleChainToRuleNodeMsg * 4. ack(source)释放当前消息的 callback 计数原消息在本实例已完成由目标实例重新追踪 * // 消息处理完成后会重新路由回来参见 RuleChainActorMessageProcessor.pushToTarget */ private void putToNodePartition(TbMsg source) { // 拷贝消息将 ruleNodeId 记录为本节点目标实例据此找到对应 RuleNodeActor TbMsg tbMsg TbMsg.newMsg(source, source.getQueueName(), source.getRuleChainId(), entityId); // 以节点 ID 为 originator 解析分区得到负责运行该单例节点的服务实例的分区 TopicPartitionInfo tpi systemContext.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, ruleNode.getId()); TransportProtos.ToRuleEngineMsg toQueueMsg TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsgProto(TbMsg.toProto(tbMsg)) .build(); // 推送到队列由目标分区的服务实例消费处理 systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null); // 原消息在本实例已完成流转转发出去调用 ack 释放 packCtx 计数 defaultCtx.ack(source); }对于路由过来的节点/** * 处理节点向自身发送的延迟消息ctx.tellSelf(msg, delayMs) * 常用于需要定时重试或延迟执行的场景如等待设备响应后重新评估 * 处理逻辑与 onRuleChainToRuleNodeMsg 相同但不需要分区检查必然在本节点 */ public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception { checkComponentStateActive(msg.getMsg()); TbMsg tbMsg msg.getMsg(); int ruleNodeCount tbMsg.getAndIncrementRuleNodeCounter(); var tenantProfileConfiguration getTenantProfileConfiguration(); int maxRuleNodeExecutionsPerMessage tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage(); if (maxRuleNodeExecutionsPerMessage 0 || ruleNodeCount maxRuleNodeExecutionsPerMessage) { apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); persistDebugInputIfAllowed(msg.getMsg(), Self); try { tbNode.onMsg(defaultCtx, msg.getMsg()); } catch (Exception e) { defaultCtx.tellFailure(msg.getMsg(), e); } } else { tbMsg.getCallback().onFailure(new RuleNodeException(Message is processed by more than maxRuleNodeExecutionsPerMessage rule nodes!, ruleChainName, ruleNode)); } }对于 Debugr 模式的 保存调试数据org.thingsboard.server.actors.ruleChain.DefaultTbContext#tellNext(org.thingsboard.server.common.msg.TbMsg, java.util.Setjava.lang.String)/** * 规则节点核心输出方法将消息按指定关系类型路由到下游节点 * 由规则节点实现TbNode在完成业务逻辑后调用 * * 流程 * 1. persistDebugOutputDebug 模式下持久化输出事件供前端调试界面展示 * 2. callback.onProcessingEnd(ruleNodeId)通知 TbMsgPackProcessingContext 本节点处理完毕 * 更新最后访问节点信息用于超时日志注意此时消息并未计入完成仍在 pendingMap 中 * 3. tell RuleNodeToRuleChainTellNextMsg 到 RuleChainActor * 由 RuleChainActorMessageProcessor.onTellNext() 根据 relationTypes 查找下游节点并继续路由 * 若没有下游节点 → 触发 callback.onSuccess()消息从 pendingMap 移除packCtx 计数减一 */ Override public void tellNext(TbMsg msg, SetString relationTypes) { RuleNode ruleNode nodeCtx.getSelf(); persistDebugOutput(msg, relationTypes); // 标记当前节点处理完毕更新诊断信息不触发 packCtx 计数 msg.getCallback().onProcessingEnd(ruleNode.getId()); // 将路由决策交还给规则链 Actor由其负责查找并分发到下游节点 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(ruleNode.getRuleChainId(), ruleNode.getId(), relationTypes, msg, null)); }内存和Kafka队列关于 QueueFactory 选择由Spring 启动时注入Factory条件InMemoryMonolithQueueFactoryqueue.typein-memoryservice.typemonolithKafkaMonolithQueueFactoryqueue.typekafkaservice.typemonolithKafkaTbRuleEngineQueueFactoryqueue.typekafkaservice.typetb-rule-engine自定义节点https://thingsboard.io/docs/user-guide/contribution/rule-node-development/核心概念在 ThingsBoard 中每条数据 → 被包装成TbMsg每个节点 → 处理一条消息输出 → 发送给下一个节点带 relationThingsBoard类比Rule Chain工作流 DAGRule Node函数节点TbMsg消息对象relationType分支条件节点生命周期init() → 初始化解析配置 onMsg() → 处理消息核心写逻辑的地方 destroy() → 销毁核心逻辑onMsg真正写业务逻辑的地方Override public void onMsg(TbContext ctx, TbMsg msg) { // 处理 msg }ThingsBoard 不会自动继续流程必须手动ctx.tellSuccess(msg);或者ctx.tellNext(msg, True);另外, 还有两种流转ctx.tellSelf// 只用于延迟一段时间后叫醒自己1个/节点(单实例只有一个)ctx.enqueueForTellNext//它走的是持久化的消息队列Kafka/内存队列通常用于触发下游节点N个/触发批次(与设备数相关但异步消费)节点定义RuleNode 注解这是“声明元数据”类似前端组件注册/** * Copyright © 2016-2026 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.thingsboard.rule.engine.api; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentScope; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; Retention(RetentionPolicy.RUNTIME) Target(ElementType.TYPE) public interface RuleNode { ComponentType type(); String name(); String nodeDescription(); String nodeDetails(); Class? extends NodeConfiguration configClazz(); ComponentClusteringMode clusteringMode() default ComponentClusteringMode.ENABLED; boolean hasQueueName() default false; boolean inEnabled() default true; boolean outEnabled() default true; ComponentScope scope() default ComponentScope.TENANT; String[] relationTypes() default {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE}; String[] uiResources() default {}; String configDirective() default ; String icon() default ; String iconUrl() default ; String docUrl() default ; boolean customRelations() default false;//自定义类型分支, 否则只有 Success 或者 Failual boolean ruleChainNode() default false; RuleChainType[] ruleChainTypes() default {RuleChainType.CORE, RuleChainType.EDGE}; int version() default 0; }
ThingsBoard 规则链系统源码分析和自定义定时器
Enrichment nodes利用数据可给节点增加(丰富)元数据信息?这些节点可以丰富消息的属性、最新的时间序列值、历史时间序列数据以及从各种来源包括消息发送者、相关实体、当前租户或客户获取的实体详细信息。Transformation nodes修改现有消息或创建新消息Transformation nodes are the data processing and manipulation(操纵) components of ThingsBoard’s rule engine that modify the content, structure, or format of incoming messages.Action nodeshttps://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/action/Perform various actions based on configuration and message根据配置和消息执行各种操作创建警报 — 为消息发送者创建新警报或更新现有的活动警报。清除警报 — 清除消息发送者现有的活动警报。计算字段 — 触发时间序列或属性数据的计算字段处理而不将原始数据持久化到数据库中。删除关系 — 根据类型和方向从所选实体到消息发送者的关系中删除关系。External nodeshttps://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/external/External nodes are the integration components of ThingsBoard’s rule engine that send messages to third-party services and external systems.AI 请求 — 向大型语言模型发送带有可定制提示和可选文件附件的请求将 AI 生成的响应作为传出消息数据返回。MQTT — 使用 QoS 1至少一次将传入的消息数据发布到外部 MQTT 代理支持动态主题模式、多种身份验证方法和 TLS/SSL 加密。Flow nodeshttps://thingsboard.io/docs/user-guide/rule-engine-2-0/nodes/flow/流程节点控制消息如何在规则引擎的不同部分之间移动。 这些节点可以在队列之间传输消息以实现顺序处理或分离不同的工作负载将消息转发到其他规则链并将结果返回给调用规则链。规则链 — 将传入的消息转发到指定的规则链。检查点 — 将传入的消息转移到指定的队列。在规则链上下文中拿设备属性使用Enrichment - originator attributes节点, 它会往 Metadata 添加设备属性, 注意key有前缀的服务端属性是:ss_规则链处理源码分析关键知识Actor 模型首先在 ThingsBoard 中这套自研框架遵循一个叫 Actor 模型概念在计算机科学中演员模型英语Actor model是一种并发运算上的模型。“演员”是一种程序上的抽象概念被视为并发运算的基本单元当一个演员接收到一则消息它可以做出一些决策、建立更多的演员、发送更多的消息、决定要如何回答接下来的消息。演员可以修改它们自己的私有状态但是只能通过消息间接的相互影响避免了基于锁的同步。演员模型推崇的哲学是“一切皆是演员”这与面向对象编程的“一切皆是对象”类似。演员是一个运算实体响应接收到的消息相互间是并发的发送有限数量的消息给其他演员创建有限数量的新演员指定接收到下一个消息时要用到的行为。以上动作不含有顺序执行的假设因此可以并行进行。https://zh.wikipedia.org/zh-cn/演员模型Actor 的三大概念演员Actor演员是模型中的基本计算实体每个演员都是一个独立、自主的单元具备以下能力封装私有状态每个演员维护自己的内部状态其他演员无法直接访问。接收消息通过邮箱Mailbox接收来自其他演员的消息。发送消息可以向其他演员异步发送有限数量的消息。创建新演员可以动态创建有限数量的新演员。改变行为在接收到消息后可以决定如何响应下一条消息即改变自身的行为。上述操作没有预设的执行顺序可以并行进行。消息传递Message Passing消息是演员之间通信的唯一方式具有以下特征异步性发送方无需等待接收方处理消息发送者与通信完全解耦。不可变性消息通常被封装为不可变的数据结构确保发送方和接收方不会共享可变状态。地址识别消息接收者通过邮件地址进行识别演员只能与它拥有地址的演员通信。邮箱Mailbox每个演员都有一个邮箱用于存储接收到的消息。演员按顺序从邮箱中取出消息进行处理这确保了状态变化以受控的方式进行同时也管理了消息传递的异步性。ThingsBoard 中的 Actor 模型Actor 包含三大核心组成部分状态State、行为Behavior和邮箱Mailbox。它将系统中的所有功能模块抽象为独立的 “Actor”(演员)单元。每个 Actor 内部维护自己的变量信息状态且状态仅由 Actor 自身管理。Actor 之间不共享内存完全通过消息传递进行通信。Actor 之间的通信桥梁是邮箱Mailbox。邮箱内部采用 FIFO先进先出的消息队列来存储消息。发送方 Actor 通过tell方法将消息异步投递到接收方 Actor 的邮箱中接收方则按照顺序从邮箱中获取消息并触发相应的计算逻辑行为来改变自身状态。ThingsBoard 的 Actor 按照严格的层级结构进行组织。根节点是App Actor它负责管理下属的所有租户 ActorTenant Actor进而管理该租户下的设备 Actor 和规则链 Actor。[[N_ThingsBoard 业务源码分析]]对于规则链消息是从集群订阅而来的, 调用栈:org.thingsboard.server.service.queue.DefaultTbCoreConsumerService#forwardToDeviceActororg.thingsboard.server.actors.ActorSystemContext#tellorg.thingsboard.server.actors.TbActorMailbox#tell(org.thingsboard.server.common.msg.TbActorMsg)org.thingsboard.server.actors.TbActorMailbox#enqueueorg.thingsboard.server.actors.TbActorMailbox#tryProcessQueueorg.thingsboard.server.actors.TbActorMailbox#processMailboxorg.thingsboard.server.actors.app.AppActor#doProcess最终到该方法org.thingsboard.server.actors.ruleChain.RuleChainActor#doProcessOverride protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); break; case QUEUE_TO_RULE_ENGINE_MSG: // 队列到规则链 processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:// 规则链 到 规则链? processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); break; case RULE_CHAIN_TO_RULE_CHAIN_MSG: processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg); break; case RULE_CHAIN_INPUT_MSG: processor.onRuleChainInputMsg((RuleChainInputMsg) msg); break; case RULE_CHAIN_OUTPUT_MSG: processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg); break; case PARTITION_CHANGE_MSG: processor.onPartitionChangeMsg((PartitionChangeMsg) msg); break; case STATS_PERSIST_TICK_MSG: onStatsPersistTick(id); break; default: return false; } return true; }org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#onQueueToRuleEngineMsg/** * 接收来自队列的消息入口由 RuleChainActor.doProcess 分发 * 区分两种情况 * 1. relationTypes 为空 → 全新消息从 firstNode 开始执行 * 2. relationTypes 非空 → 重试/恢复消息从消息携带的 ruleNodeId 位置继续执行 */ void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) { TbMsg msg envelope.getMsg(); // 校验消息是否已过期callback 被取消 if (!checkMsgValid(msg)) { return; } log.trace([{}][{}] Processing message [{}]: {}, entityId, firstId, msg.getId(), msg); if (envelope.getRelationTypes() null || envelope.getRelationTypes().isEmpty()) { // 首次进入规则链useRuleNodeIdFromMsgtrue但 msg.getRuleNodeId()null → 路由到 firstNode onTellNext(msg, true); } else { // 消息在某节点处理后通过 tellNext 重入或从队列重试恢复携带目标 nodeId 和 relationType onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage()); } }首次入口/** * 将消息路由到具体 RuleNode * param useRuleNodeIdFromMsg true从 msg.getRuleNodeId() 查找目标节点null 时使用 firstNode */ private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) { try { // 检查规则链组件是否处于 ACTIVE 状态否则抛出 RuleNodeException checkComponentStateActive(msg); RuleNodeId targetId useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null; RuleNodeCtx targetCtx; if (targetId null) { // 没有指定目标节点 → 从入口节点 firstNode 开始 // 同时重置 msg 上的 ruleNodeId绑定当前 ruleChainId targetCtx firstNode;//这个是 org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode msg msg.copy() .ruleChainId(entityId) .resetRuleNodeId() .build(); } else { // 已有目标节点 ID如 tellNext 后恢复执行 targetCtx nodeActors.get(targetId); } if (targetCtx ! null) { log.trace([{}][{}] Pushing message to target rule node, entityId, targetId); // 构建 DefaultTbContext 并 tell RuleChainToRuleNodeMsg 到目标 RuleNodeActor pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE); } else { // 目标节点不存在可能已被删除直接 ack 成功不阻塞批处理 log.trace([{}][{}] Rule node does not exist. Probably old message, entityId, targetId); msg.getCallback().onSuccess(); } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { msg.getCallback().onFailure(new RuleEngineException(e.getMessage(), e)); } }节点重入org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#onTellNext(org.thingsboard.server.common.msg.TbMsg, org.thingsboard.server.common.data.id.RuleNodeId, java.util.Setjava.lang.String, java.lang.String)/** * 核心路由逻辑根据 originatorNodeId 查找 nodeRoutes按 relationType 过滤出下游节点列表 * 三种分支 * 0 个下游 → 终止节点回调 onSuccess 或 onFailure流程结束 * 1 个下游 → 直接 pushToTarget同分区内存投递 或 跨分区入队 * N 个下游 → 用 MultipleTbQueueTbMsgCallbackWrapper 包装对每个下游副本独立回调 */ private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, SetString relationTypes, String failureMessage) { try { checkComponentStateActive(msg); EntityId entityId msg.getOriginator(); // 解析消息归属的分区用于判断是否需要跨分区入队 TopicPartitionInfo tpi systemContext.resolve(tenantId, entityId, msg); ListRuleNodeRelation ruleNodeRelations nodeRoutes.get(originatorNodeId); if (ruleNodeRelations null) { log.warn([{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message., tenantId, entityId, msg.getId()); ruleNodeRelations Collections.emptyList(); } // 按 relationType 过滤出匹配的下游连线大小写不敏感 ListRuleNodeRelation relationsByTypes ruleNodeRelations.stream() .filter(r - contains(relationTypes, r.getType())) .collect(Collectors.toList()); int relationsCount relationsByTypes.size(); if (relationsCount 0) {// 0 个下游 // 当前节点是叶子节点无匹配下游 log.trace([{}][{}][{}] No outbound relations to process, tenantId, entityId, msg.getId()); if (relationTypes.contains(TbNodeConnectionType.FAILURE)) { // 以 Failure relationType 到达叶子节点 → 整条消息处理失败 // → callback.onFailure() → packCtx.onFailure() → pendingCount-- RuleNodeCtx ruleNodeCtx nodeActors.get(originatorNodeId); if (ruleNodeCtx ! null) { msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf())); } else { log.debug([{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info, entityId, originatorNodeId.getId()); msg.getCallback().onFailure(new RuleEngineException(Failure during message processing by Rule Node [ originatorNodeId.getId().toString() ])); } } else { // 以 Success/其他 relationType 到达叶子节点 → 消息正常结束 // → callback.onSuccess() → packCtx.onSuccess() → pendingCount-- // → 若 pendingCount0 则 latch.countDown()processMsgs await() 解除阻塞 msg.getCallback().onSuccess(); } } else if (relationsCount 1) {// 1 个下游 for (RuleNodeRelation relation : relationsByTypes) { log.trace([{}][{}][{}] Pushing message to single target: [{}], tenantId, entityId, msg.getId(), relation.getOut()); // 单下游直接路由callback 不复制 pushToTarget(tpi, msg, relation.getOut(), relation.getType()); } } else {//N 个下游 // 多下游如同时连 Success→A 和 Success→B // 用 MultipleTbQueueTbMsgCallbackWrapper 包装原 callback需要全部副本都完成才触发原 callback MultipleTbQueueTbMsgCallbackWrapper callbackWrapper new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); log.trace([{}][{}][{}] Pushing message to multiple targets: [{}], tenantId, entityId, msg.getId(), relationsByTypes); for (RuleNodeRelation relation : relationsByTypes) { EntityId target relation.getOut(); // 多下游场景必须走队列为每个分支复制 msg 新 UUID避免并发修改同一对象 putToQueue(tpi, msg, callbackWrapper, target); } } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { log.warn([ tenantId ] [ entityId ] [ msg.getId() ] onTellNext failure, e); msg.getCallback().onFailure(new RuleEngineException(onTellNext - e.getMessage(), e)); } }一个下游org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#pushToTarget/** * 单下游路由分发 * 同分区直接内存投递pushMsgToNode / RuleChainToRuleChainMsg零序列化开销 * 跨分区序列化写队列由目标分区的 consumerLoop 消费后继续执行 */ private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) { if (tpi.isMyPartition()) { switch (target.getEntityType()) { case RULE_NODE: // 同分区内直接 tell → RuleNodeActor.onRuleChainToRuleNodeMsg() pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType); break; case RULE_CHAIN: // 跳转到其他规则链tell TenantActor → 目标 RuleChainActor.onRuleChainToRuleChainMsg() parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType)); break; } } else { // 目标节点在其他分区集群模式写入队列等待目标分区消费者处理 putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target); } }多个下游org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#putToQueue(org.thingsboard.server.common.msg.queue.TopicPartitionInfo, org.thingsboard.server.common.msg.TbMsg, org.thingsboard.server.queue.TbQueueCallback, org.thingsboard.server.common.data.id.EntityId)/** * 多下游场景为目标节点复制消息新 UUID序列化后写入队列 * 下游是 RULE_NODE → 记录目标 ruleNodeId消费后直接从该节点恢复执行 * 下游是 RULE_CHAIN → 记录目标 ruleChainId消费后从目标链 firstNode 开始 */ private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) { switch (target.getEntityType()) { case RULE_NODE: putToQueue(tpi, msg.copy() .id(UUID.randomUUID()) .ruleChainId(entityId) .ruleNodeId(new RuleNodeId(target.getId())) .build(), callbackWrapper); break; case RULE_CHAIN: putToQueue(tpi, msg.copy() .id(UUID.randomUUID()) .ruleChainId(new RuleChainId(target.getId())) .resetRuleNodeId() .build(), callbackWrapper); break; } }org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor#putToQueue(org.thingsboard.server.common.msg.queue.TopicPartitionInfo, org.thingsboard.server.common.msg.TbMsg, org.thingsboard.server.queue.TbQueueCallback)/** * 将消息序列化为 ToRuleEngineMsg proto通过 clusterService 写入目标 topic/partition * 对应消费端TbRuleEngineQueueConsumerManager.processMsgs() → submitMessage() */ private void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) { ToRuleEngineMsg toQueueMsg ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsgProto(TbMsg.toProto(newMsg)) .build(); clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper); }对于路由过来的规则链/** * 规则链路由到本节点的消息入口主处理路径 * 由 RuleChainActorMessageProcessor.pushToTarget() 通过 Actor tell 触发 * * 流程 * 1. 单例节点分区检查若本节点配置了 singletonMode 且当前服务不是该分区的负责者 * → 调用 putToNodePartition() 将消息序列化后重新入队路由到正确的服务实例 * 2. 若在本节点所在分区 * a. callback.onProcessingStart(info)记录当前节点为最后访问节点供超时日志使用 * b. checkComponentStateActive节点未激活初始化失败/已停止时直接 callback.onFailure * c. 统计本条消息已经过的 RuleNode 数量超过租户配置上限时直接 callback.onFailure * d. tbNode.onMsg(ctx, tbMsg)调用具体规则节点实现执行业务逻辑 * 节点实现完成后通过 ctx.tellSuccess / ctx.tellNext / ctx.tellFailure / ctx.ack 输出结果 * e. 若 tbNode.onMsg() 抛出异常调用 ctx.tellFailure() 向 FAILURE 链路由 */ void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { if (!isMyNodePartition()) { // 单例模式singletonModetrue且本实例不是负责分区 → 转发到正确分区 putToNodePartition(msg.getMsg()); } else { // 记录最后访问节点信息用于超时/失败时的日志诊断 msg.getMsg().getCallback().onProcessingStart(info); // 节点未就绪时立即失败不调用 tbNode checkComponentStateActive(msg.getMsg()); TbMsg tbMsg msg.getMsg(); // 累加节点执行计数防止消息在规则链中无限循环 int ruleNodeCount tbMsg.getAndIncrementRuleNodeCounter(); var tenantProfileConfiguration getTenantProfileConfiguration(); int maxRuleNodeExecutionsPerMessage tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage(); if (maxRuleNodeExecutionsPerMessage 0 || ruleNodeCount maxRuleNodeExecutionsPerMessage) { // 上报 API 用量统计用于配额控制 apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); // Debug 模式下持久化输入事件供规则链调试界面展示 persistDebugInputIfAllowed(msg.getMsg(), msg.getFromRelationType()); try { // 核心调用执行具体规则节点逻辑如 Save Timeseries / Filter / Transform 等 // 节点内部通过 ctx.tellSuccess/tellNext/tellFailure/ack 将处理结果回传给规则链 tbNode.onMsg(msg.getCtx(), msg.getMsg()); } catch (Exception e) { // 节点执行异常 → 路由到 FAILURE 链最终可能触发 callback.onFailure msg.getCtx().tellFailure(msg.getMsg(), e); } } else { // 超过最大节点执行次数直接标记消息失败防止死循环 tbMsg.getCallback().onFailure(new RuleNodeException(Message is processed by more than maxRuleNodeExecutionsPerMessage rule nodes!, ruleChainName, ruleNode)); } } }是别的分区节点处理的 转发到正确分区org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessor#putToNodePartition/** * 将消息路由到正确服务实例上的节点分区单例节点跨服务转发 * * 仅在 singletonModetrue 且当前服务不负责该节点分区时触发。 * 典型场景集群部署中某个规则节点设为全局单例只在特定服务实例运行 * 其他实例收到消息时需将消息转发过去。 * 流程 * 1. 拷贝消息并将 ruleNodeId 设为本节点 ID确保目标服务路由到正确节点 * 2. 解析节点 ID 对应的 TPI目标分区该分区属于负责运行此单例节点的服务实例 * 3. 序列化并推送到 Rule Engine 队列目标实例消费后再次进入 onRuleChainToRuleNodeMsg * 4. ack(source)释放当前消息的 callback 计数原消息在本实例已完成由目标实例重新追踪 * // 消息处理完成后会重新路由回来参见 RuleChainActorMessageProcessor.pushToTarget */ private void putToNodePartition(TbMsg source) { // 拷贝消息将 ruleNodeId 记录为本节点目标实例据此找到对应 RuleNodeActor TbMsg tbMsg TbMsg.newMsg(source, source.getQueueName(), source.getRuleChainId(), entityId); // 以节点 ID 为 originator 解析分区得到负责运行该单例节点的服务实例的分区 TopicPartitionInfo tpi systemContext.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, ruleNode.getId()); TransportProtos.ToRuleEngineMsg toQueueMsg TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsgProto(TbMsg.toProto(tbMsg)) .build(); // 推送到队列由目标分区的服务实例消费处理 systemContext.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), toQueueMsg, null); // 原消息在本实例已完成流转转发出去调用 ack 释放 packCtx 计数 defaultCtx.ack(source); }对于路由过来的节点/** * 处理节点向自身发送的延迟消息ctx.tellSelf(msg, delayMs) * 常用于需要定时重试或延迟执行的场景如等待设备响应后重新评估 * 处理逻辑与 onRuleChainToRuleNodeMsg 相同但不需要分区检查必然在本节点 */ public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception { checkComponentStateActive(msg.getMsg()); TbMsg tbMsg msg.getMsg(); int ruleNodeCount tbMsg.getAndIncrementRuleNodeCounter(); var tenantProfileConfiguration getTenantProfileConfiguration(); int maxRuleNodeExecutionsPerMessage tenantProfileConfiguration.getMaxRuleNodeExecsPerMessage(); if (maxRuleNodeExecutionsPerMessage 0 || ruleNodeCount maxRuleNodeExecutionsPerMessage) { apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); persistDebugInputIfAllowed(msg.getMsg(), Self); try { tbNode.onMsg(defaultCtx, msg.getMsg()); } catch (Exception e) { defaultCtx.tellFailure(msg.getMsg(), e); } } else { tbMsg.getCallback().onFailure(new RuleNodeException(Message is processed by more than maxRuleNodeExecutionsPerMessage rule nodes!, ruleChainName, ruleNode)); } }对于 Debugr 模式的 保存调试数据org.thingsboard.server.actors.ruleChain.DefaultTbContext#tellNext(org.thingsboard.server.common.msg.TbMsg, java.util.Setjava.lang.String)/** * 规则节点核心输出方法将消息按指定关系类型路由到下游节点 * 由规则节点实现TbNode在完成业务逻辑后调用 * * 流程 * 1. persistDebugOutputDebug 模式下持久化输出事件供前端调试界面展示 * 2. callback.onProcessingEnd(ruleNodeId)通知 TbMsgPackProcessingContext 本节点处理完毕 * 更新最后访问节点信息用于超时日志注意此时消息并未计入完成仍在 pendingMap 中 * 3. tell RuleNodeToRuleChainTellNextMsg 到 RuleChainActor * 由 RuleChainActorMessageProcessor.onTellNext() 根据 relationTypes 查找下游节点并继续路由 * 若没有下游节点 → 触发 callback.onSuccess()消息从 pendingMap 移除packCtx 计数减一 */ Override public void tellNext(TbMsg msg, SetString relationTypes) { RuleNode ruleNode nodeCtx.getSelf(); persistDebugOutput(msg, relationTypes); // 标记当前节点处理完毕更新诊断信息不触发 packCtx 计数 msg.getCallback().onProcessingEnd(ruleNode.getId()); // 将路由决策交还给规则链 Actor由其负责查找并分发到下游节点 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(ruleNode.getRuleChainId(), ruleNode.getId(), relationTypes, msg, null)); }内存和Kafka队列关于 QueueFactory 选择由Spring 启动时注入Factory条件InMemoryMonolithQueueFactoryqueue.typein-memoryservice.typemonolithKafkaMonolithQueueFactoryqueue.typekafkaservice.typemonolithKafkaTbRuleEngineQueueFactoryqueue.typekafkaservice.typetb-rule-engine自定义节点https://thingsboard.io/docs/user-guide/contribution/rule-node-development/核心概念在 ThingsBoard 中每条数据 → 被包装成TbMsg每个节点 → 处理一条消息输出 → 发送给下一个节点带 relationThingsBoard类比Rule Chain工作流 DAGRule Node函数节点TbMsg消息对象relationType分支条件节点生命周期init() → 初始化解析配置 onMsg() → 处理消息核心写逻辑的地方 destroy() → 销毁核心逻辑onMsg真正写业务逻辑的地方Override public void onMsg(TbContext ctx, TbMsg msg) { // 处理 msg }ThingsBoard 不会自动继续流程必须手动ctx.tellSuccess(msg);或者ctx.tellNext(msg, True);另外, 还有两种流转ctx.tellSelf// 只用于延迟一段时间后叫醒自己1个/节点(单实例只有一个)ctx.enqueueForTellNext//它走的是持久化的消息队列Kafka/内存队列通常用于触发下游节点N个/触发批次(与设备数相关但异步消费)节点定义RuleNode 注解这是“声明元数据”类似前端组件注册/** * Copyright © 2016-2026 The Thingsboard Authors * * Licensed under the Apache License, Version 2.0 (the License); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an AS IS BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.thingsboard.rule.engine.api; import org.thingsboard.server.common.data.msg.TbNodeConnectionType; import org.thingsboard.server.common.data.plugin.ComponentClusteringMode; import org.thingsboard.server.common.data.plugin.ComponentScope; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; Retention(RetentionPolicy.RUNTIME) Target(ElementType.TYPE) public interface RuleNode { ComponentType type(); String name(); String nodeDescription(); String nodeDetails(); Class? extends NodeConfiguration configClazz(); ComponentClusteringMode clusteringMode() default ComponentClusteringMode.ENABLED; boolean hasQueueName() default false; boolean inEnabled() default true; boolean outEnabled() default true; ComponentScope scope() default ComponentScope.TENANT; String[] relationTypes() default {TbNodeConnectionType.SUCCESS, TbNodeConnectionType.FAILURE}; String[] uiResources() default {}; String configDirective() default ; String icon() default ; String iconUrl() default ; String docUrl() default ; boolean customRelations() default false;//自定义类型分支, 否则只有 Success 或者 Failual boolean ruleChainNode() default false; RuleChainType[] ruleChainTypes() default {RuleChainType.CORE, RuleChainType.EDGE}; int version() default 0; }