ThingsBoard源码解析-数据订阅与规则链数据处理
- thingsboard
- 时间:2023-05-04 18:11
- 3924人已阅读
🔔🔔🔔好消息!好消息!🔔🔔🔔
有需要的朋友👉:联系凯哥
前言
//org.thingsboard.server.transport.mqtt.MqttTransportHandler void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) { switch (msg.fixedHeader().messageType()) { case PUBLISH: processPublish(ctx, (MqttPublishMessage) msg); break; case SUBSCRIBE: processSubscribe(ctx, (MqttSubscribeMessage) msg); break; case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case PINGREQ: if (checkConnected(ctx, msg)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } break; case DISCONNECT: ctx.close(); break; case PUBACK: int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId(); TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId); if (rpcRequest != null) { transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); } break; default: break; } }
//org.thingsboard.server.transport.mqtt.MqttTransportHandler private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { if (!checkConnected(ctx, mqttMsg)) { return; } String topicName = mqttMsg.variableHeader().topicName(); int msgId = mqttMsg.variableHeader().packetId(); log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId); if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) { //消息来源为网关主题 if (gatewaySessionHandler != null) { handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg); transportService.reportActivity(deviceSessionCtx.getSessionInfo()); } } else { //处理设备的消息,重点 processDevicePublish(ctx, mqttMsg, topicName, msgId); } }
//org.thingsboard.server.transport.mqtt.MqttTransportHandler private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { try { Matcher fwMatcher; MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor(); if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) { TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); attrReqTopicType = TopicType.V1; } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); toServerRpcSubTopicType = TopicType.V1; } else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) { TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg)); } else if ((fwMatcher = FW_REQUEST_PATTERN.matcher(topicName)).find()) { getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE); } else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) { getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE); } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) { TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) { TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg); transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) { TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg)); } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); toServerRpcSubTopicType = TopicType.V2_JSON; } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); toServerRpcSubTopicType = TopicType.V2_PROTO; } else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) { TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC); transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg)); toServerRpcSubTopicType = TopicType.V2; } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) { TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); attrReqTopicType = TopicType.V2_JSON; } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) { TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); attrReqTopicType = TopicType.V2_PROTO; } else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) { TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX); transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg)); attrReqTopicType = TopicType.V2; } else { transportService.reportActivity(deviceSessionCtx.getSessionInfo()); ack(ctx, msgId); } } catch (AdaptorException e) { log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); ctx.close(); } }
这里根据消息来源主题的不同,进行对应的处理
顺着属性消息处理继续往下看
//org.thingsboard.server.common.transport.service.DefaultTransportService @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) { if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) { //更新活动时间 reportActivityInternal(sessionInfo); TenantId tenantId = getTenantId(sessionInfo); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); //获取键值对 JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); //构造元数据 TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("notifyDevice", "false"); CustomerId customerId = getCustomerId(sessionInfo); //发送至规则引擎 sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback))); } }
//org.thingsboard.server.common.transport.service.DefaultTransportService private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) { //创建设备配置标识 DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); //从缓存中获取设备配置 DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); RuleChainId ruleChainId; String queueName; if (deviceProfile == null) { //无设备配置,使用默认的规则链和队列名 log.warn("[{}] Device profile is null!", deviceProfileId); ruleChainId = null; queueName = ServiceQueue.MAIN; } else { //获取规则链标识 ruleChainId = deviceProfile.getDefaultRuleChainId(); //获取队列名 String defaultQueueName = deviceProfile.getDefaultQueueName(); queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN; } //创建消息 TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null); //发送至规则引擎 sendToRuleEngine(tenantId, tbMsg, callback); }
//org.thingsboard.server.common.transport.service.DefaultTransportService private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) { //获取分区信息 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator()); if (log.isTraceEnabled()) { log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); } //创建消息 ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)) .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); //统计数增加 ruleEngineProducerStats.incrementTotal(); //统计相关回调 StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats); //发送至规则消息引擎队列 ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); }
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
package org.thingsboard.server.queue; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import java.util.List; import java.util.Set; public interface TbQueueConsumer<T extends TbQueueMsg> { String getTopic(); void subscribe(); void subscribe(Set<TopicPartitionInfo> partitions); void unsubscribe(); List<T> poll(long durationInMillis); void commit(); boolean isStopped(); }
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { updateCurrentThreadName(threadSuffix); while (!stopped && !consumer.isStopped()) { try { //拉取消息 List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration); if (msgs.isEmpty()) { continue; } //获取提交策略 final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration); //获取确认策略 final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration); //初始化提交策略 submitStrategy.init(msgs); while (!stopped) { //创建处理上下文 TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs()); //提交,重点为 submitMessage 方法 submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg))); //超时等待 final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS); //创建结果 TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx); if (timeout) { //超时处理 printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout"); } if (!ctx.getFailedMap().isEmpty()) { //失败处理 printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); } //打印统计信息 ctx.printProfilerStats(); //根据结果获取决策 TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); if (statsEnabled) { //记录是否提交完成 stats.log(result, decision.isCommit()); } //清理上下文 ctx.cleanup(); //判断决策 if (decision.isCommit()) { //已提交 //停止提交策略 submitStrategy.stop(); //退出循环 break; } else { //未提交完毕 //将决策的重试消息集合更新至提交策略,继续提交 submitStrategy.update(decision.getReprocessMap()); } } //消费端提交确认 consumer.commit(); } catch (Exception e) { if (!stopped) { log.warn("Failed to process messages from queue.", e); try { Thread.sleep(pollDuration); } catch (InterruptedException e2) { log.trace("Failed to wait until the server has capacity to handle new requests", e2); } } } } log.info("TB Rule Engine Consumer stopped."); }
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService void submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) { log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue()); //获取原始消息 ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); //获取租户标识 TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); //创建回调 TbMsgCallback callback = prometheusStatsEnabled ? new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : new TbMsgPackCallback(id, tenantId, ctx); try { if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { //转发至规则引擎 Actor forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback); } else { //消息为空直接回调成功方法 callback.onSuccess(); } } catch (Exception e) { //回调失败方法 callback.onFailure(new RuleEngineException(e.getMessage())); } }
//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { //构建消息 TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback); QueueToRuleEngineMsg msg; //获取关联类型列表 ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList(); Set<String> relationTypes = null; if (relationTypesList != null) { if (relationTypesList.size() == 1) { relationTypes = Collections.singleton(relationTypesList.get(0)); } else { relationTypes = new HashSet<>(relationTypesList); } } //创建消息 msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage()); //使用 Actor 系统上下文发送消息 actorContext.tell(msg); }
package org.thingsboard.server.common.msg.queue; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorStopReason; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbRuleEngineActorMsg; import java.util.Set; /** * Created by ashvayka on 15.03.18. */ @ToString @EqualsAndHashCode(callSuper = true) public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg { @Getter private final TenantId tenantId; @Getter private final Set<String> relationTypes; @Getter private final String failureMessage; public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) { super(tbMsg); this.tenantId = tenantId; this.relationTypes = relationTypes; this.failureMessage = failureMessage; } @Override public MsgType getMsgType() { return MsgType.QUEUE_TO_RULE_ENGINE_MSG; } @Override public void onTbActorStopped(TbActorStopReason reason) { String message; if (msg.getRuleChainId() != null) { message = reason == TbActorStopReason.STOPPED ? String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) : String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId()); } else { message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!"; } msg.getCallback().onFailure(new RuleEngineException(message)); } public boolean isTellNext() { return relationTypes != null && !relationTypes.isEmpty(); } }
得知消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG,后面会用到
接着我们看Actor系统对消息的处理
//org.thingsboard.server.actors.ActorSystemContext public void tell(TbActorMsg tbActorMsg) { appActor.tell(tbActorMsg); }
appActor 为应用Actor,是整个Actor系统的根Actor,感兴趣可以自行阅读
根据之前的学习,我们了解到Actor的处理方法为boolean process(TbActor
//org.thingsboard.server.actors.service.ContextAwareActor @Override public boolean process(TbActorMsg msg) { if (log.isDebugEnabled()) { log.debug("Processing msg: {}", msg); } //处理消息 if (!doProcess(msg)) { log.warn("Unprocessed message: {}!", msg); } return false; }
//org.thingsboard.server.actors.app.AppActor @Override protected boolean doProcess(TbActorMsg msg) { if (!ruleChainsInitialized) { //规则链未初始化 //初始化租户 Actor initTenantActors(); ruleChainsInitialized = true; if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) { log.warn("Rule Chains initialized by unexpected message: {}", msg); } } //判断消息类型 switch (msg.getMsgType()) { case APP_INIT_MSG: break; case PARTITION_CHANGE_MSG: ctx.broadcastToChildren(msg); break; case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); break; case QUEUE_TO_RULE_ENGINE_MSG: onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case TRANSPORT_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((TenantAwareMsg) msg, false); break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case REMOVE_RPC_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((TenantAwareMsg) msg, true); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: onToTenantActorMsg((EdgeEventUpdateMsg) msg); break; case SESSION_TIMEOUT_MSG: ctx.broadcastToChildrenByType(msg, EntityType.TENANT); break; default: return false; } return true; }
//org.thingsboard.server.actors.app.AppActor private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) { //消息来自系统,视为异常 msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); } else { if (!deletedTenants.contains(msg.getTenantId())) { //租户未被删除 //获取或创建租户 Actor 并发送消息 getOrCreateTenantActor(msg.getTenantId()).tell(msg); } else { //租户已删除,直接回调成功方法 msg.getMsg().getCallback().onSuccess(); } } }
//org.thingsboard.server.actors.app.AppActor private TbActorRef getOrCreateTenantActor(TenantId tenantId) { return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId)); }
//org.thingsboard.server.actors.tenant.TenantActor @Override protected boolean doProcess(TbActorMsg msg) { if (cantFindTenant) { //找不到租户 log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg); if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) { QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg; //直接回调成功方法 queueMsg.getMsg().getCallback().onSuccess(); } else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) { TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg; //直接回调成功方法 transportMsg.getCallback().onSuccess(); } return true; } switch (msg.getMsgType()) { case PARTITION_CHANGE_MSG: PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg; ServiceType serviceType = partitionChangeMsg.getServiceQueueKey().getServiceType(); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { //To Rule Chain Actors broadcast(msg); } else if (ServiceType.TB_CORE.equals(serviceType)) { List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) { @Override protected boolean testEntityId(EntityId entityId) { return super.testEntityId(entityId) && !isMyPartition(entityId); } }); deviceActorIds.forEach(id -> ctx.stop(id)); } break; case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); break; case QUEUE_TO_RULE_ENGINE_MSG: onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case TRANSPORT_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((DeviceAwareMsg) msg, false); break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG: case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG: case REMOVE_RPC_TO_DEVICE_ACTOR_MSG: onToDeviceActorMsg((DeviceAwareMsg) msg, true); break; case SESSION_TIMEOUT_MSG: ctx.broadcastToChildrenByType(msg, EntityType.DEVICE); break; case RULE_CHAIN_INPUT_MSG: case RULE_CHAIN_OUTPUT_MSG: case RULE_CHAIN_TO_RULE_CHAIN_MSG: onRuleChainMsg((RuleChainAwareMsg) msg); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: onToEdgeSessionMsg((EdgeEventUpdateMsg) msg); break; default: return false; } return true; }
//org.thingsboard.server.actors.tenant.TenantActor private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { //检查当前服务是否为规则引擎服务 if (!isRuleEngine) { log.warn("RECEIVED INVALID MESSAGE: {}", msg); return; } TbMsg tbMsg = msg.getMsg(); //检查规则引擎是否启用状态 if (getApiUsageState().isReExecEnabled()) { //判断规则链标识是否为空 if (tbMsg.getRuleChainId() == null) { //获取根链 Actor 并判断是否为空 if (getRootChainActor() != null) { //向根链 Actor 发送消息 getRootChainActor().tell(msg); } else { //无根链 Actor ,回调失败方法 tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!")); log.info("[{}] No Root Chain: {}", tenantId, msg); } } else { try { //向指定规则链发送消息 ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg); } catch (TbActorNotRegisteredException ex) { log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId()); //TODO: 3.1 Log it to dead letters queue; tbMsg.getCallback().onSuccess(); } } } else { log.trace("[{}] Ack message because Rule Engine is disabled", tenantId); tbMsg.getCallback().onSuccess(); } }
//org.thingsboard.server.actors.ruleChain.RuleChainActor @Override protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case COMPONENT_LIFE_CYCLE_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); break; case QUEUE_TO_RULE_ENGINE_MSG: processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break; case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG: processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); break; case RULE_CHAIN_TO_RULE_CHAIN_MSG: processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg); break; case RULE_CHAIN_INPUT_MSG: processor.onRuleChainInputMsg((RuleChainInputMsg) msg); break; case RULE_CHAIN_OUTPUT_MSG: processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg); break; case PARTITION_CHANGE_MSG: processor.onPartitionChangeMsg((PartitionChangeMsg) msg); break; case STATS_PERSIST_TICK_MSG: onStatsPersistTick(id); break; default: return false; } return true; }
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) { TbMsg msg = envelope.getMsg(); //验证消息 if (!checkMsgValid(msg)) { return; } log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg); //判断是否包含关联类型 if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) { onTellNext(msg, true); } else { onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage()); } }
结合规则链的结构联想一下,规则链本质是从上个节点传递到下个节点,节点传递之间有什么是可有可无的?
显然关联类型就是节点流转的条件
由于首节点是没有条件的,因此在构造消息时没有设置关联类型
注:每条规则链仅有一个首节点,且除首节点外的其他节点至少存在一个关联类型(TbRelationTypes.FAILURE),这部分感兴趣自行研究
先看没有关联类型的处理
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) { try { //检查组件(节点)状态是否正常 checkComponentStateActive(msg); //获取节点标识 RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null; RuleNodeCtx targetCtx; if (targetId == null) { //未指定目标节点 //将当前规则链的首节点作为目标 targetCtx = firstNode; //拷贝消息,entityId即当前规则链的标识 msg = msg.copyWithRuleChainId(entityId); } else { //已指定目标节点,获取节点上下文 targetCtx = nodeActors.get(targetId); } //判断上下文是否存在 if (targetCtx != null) { log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId); //推送至节点 pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE); } else { log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId); msg.getCallback().onSuccess(); } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { msg.getCallback().onFailure(new RuleEngineException(e.getMessage())); } }
package org.thingsboard.server.actors.ruleChain; import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleNode; /** * Created by ashvayka on 19.03.18. */ @Data @AllArgsConstructor final class RuleNodeCtx { private final TenantId tenantId; private final TbActorRef chainActor; private final TbActorRef selfActor; private RuleNode self; }
很简单的结构,记录了租户标识,规则链Actor,自身节点Actor和自身节点
继续
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) { if (nodeCtx != null) { //创建消息并告知自身节点 nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType)); } else { log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName); msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty")); } }
与QueueToRuleEngineMsg类似,查看可知RuleChainToRuleNodeMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_MSG,后面同样以此为处理条件
终于到节点的Actor了
//org.thingsboard.server.actors.ruleChain.RuleNodeActor @Override protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case COMPONENT_LIFE_CYCLE_MSG: case RULE_NODE_UPDATED_MSG: onComponentLifecycleMsg((ComponentLifecycleMsg) msg); break; case RULE_CHAIN_TO_RULE_MSG: onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg); break; case RULE_TO_SELF_MSG: onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg); break; case STATS_PERSIST_TICK_MSG: onStatsPersistTick(id); break; case PARTITION_CHANGE_MSG: onClusterEventMsg((PartitionChangeMsg) msg); break; default: return false; } return true; }
//org.thingsboard.server.actors.ruleChain.RuleNodeActor private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg envelope) { TbMsg msg = envelope.getMsg(); //验证消息 if (!msg.isValid()) { if (log.isTraceEnabled()) { log.trace("Skip processing of message: {} because it is no longer valid!", msg); } return; } if (log.isDebugEnabled()) { log.debug("[{}][{}][{}] Going to process rule engine msg: {}", ruleChainId, id, processor.getComponentName(), msg); } try { //处理消息 processor.onRuleChainToRuleNodeMsg(envelope); //增加处理计数 increaseMessagesProcessedCount(); } catch (Exception e) { logAndPersist("onRuleMsg", e); } }
//org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessor void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception { //回调处理开始通知 msg.getMsg().getCallback().onProcessingStart(info); //检查组件状态可用 checkComponentStateActive(msg.getMsg()); TbMsg tbMsg = msg.getMsg(); //获取规则节点计数 int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter(); //获取消息的最大规则节点执行次数 int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage(); //判断执行次数是否超限 if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) { //上报规则引擎执行计数 apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT); if (ruleNode.isDebugMode()) { systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType()); } try { //执行节点处理方法 tbNode.onMsg(msg.getCtx(), msg.getMsg()); } catch (Exception e) { msg.getCtx().tellFailure(msg.getMsg(), e); } } else { tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode)); } }
这里仅调用了tbNode的onMsg方法,那么节点的流转呢?
猜想所有节点继承自一个公共的父抽象类,该类中实现了节点的流转
那么看一下继承关系
这些节点并没由继承公共的父类,流转方法没有抽离出来?
随便找一个节点看看,这里我看的是TbMsgTypeFilterNode
//org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode @Override public void onMsg(TbContext ctx, TbMsg msg) { ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False"); }
//org.thingsboard.server.actors.ruleChain.DefaultTbContext @Override public void tellSuccess(TbMsg msg) { tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null); } @Override public void tellNext(TbMsg msg, String relationType) { tellNext(msg, Collections.singleton(relationType), null); } @Override public void tellNext(TbMsg msg, Set<String> relationTypes) { tellNext(msg, relationTypes, null); }
这里贴出了tellSuccess方法和另一个tellNext方法,它们有被其他节点使用
//org.thingsboard.server.actors.ruleChain.DefaultTbContext private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) { if (nodeCtx.getSelf().isDebugMode()) { relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); } //回调处理结束通知 msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); //像规则链 Actor 发送消息 nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); }
和前面一样,查看RuleNodeToRuleChainTellNextMsg的消息类型为MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG
接下来又回到了规则链Actor的doProcess方法,找到对应的处理方法
//org.thingsboard.server.actors.ruleChain.RuleChainActor case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG: processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg); break;
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) { var msg = envelope.getMsg(); if (checkMsgValid(msg)) { onTellNext(msg, envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage()); } }
这里调用的onTellNext方法即前面onQueueToRuleEngineMsg方法中根据关联类型通知的方法
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) { try { checkComponentStateActive(msg); EntityId entityId = msg.getOriginator(); //获取主题分区信息 TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId); //根据来源节点标识获取关联(后续节点的指向)列表 List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId); if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore log.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId()); ruleNodeRelations = Collections.emptyList(); } //根据指定的关联类型筛选关联 List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream() .filter(r -> contains(relationTypes, r.getType())) .collect(Collectors.toList()); //获取关联个数 int relationsCount = relationsByTypes.size(); if (relationsCount == 0) { //没有后续节点了 log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); //判断当前关联是否包含失败(上个节点是否执行失败) if (relationTypes.contains(TbRelationTypes.FAILURE)) { //获取规则节点上下文 RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId); if (ruleNodeCtx != null) { //回调消息的失败方法 msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf())); } else { log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId()); //回调消息的失败方法 msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]")); } } else { //回调消息的成功方法 msg.getCallback().onSuccess(); } } else if (relationsCount == 1) { //后续仅一个节点 //此处循环仅执行一次 for (RuleNodeRelation relation : relationsByTypes) { log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); //推送给目标 pushToTarget(tpi, msg, relation.getOut(), relation.getType()); } } else { //后续多个节点 MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes); //遍历关联列表 for (RuleNodeRelation relation : relationsByTypes) { EntityId target = relation.getOut(); //推送至队列 putToQueue(tpi, msg, callbackWrapper, target); } } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e); msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage())); } }
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) { switch (target.getEntityType()) { case RULE_NODE: putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper); break; case RULE_CHAIN: putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper); break; } }
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor private void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) { //构建消息 ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(newMsg)) .build(); //推送至规则引擎 clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper); }
//org.thingsboard.server.service.queue.DefaultTbClusterService @Override public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) { log.trace("PUSHING msg: {} to:{}", msg, tpi); //获取规则引擎消息生产端,发送消息 producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); toRuleEngineMsgs.incrementAndGet(); }
消息发送至队列,消费端接收到消息后,根据关联类型再次调用onTellNext方法处理,此时的消息仅指向单个后续节点
最后查看单个后续节点的处理
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) { //判断是否为当前服务负责的分区 if (tpi.isMyPartition()) { //判断实体类型 switch (target.getEntityType()) { case RULE_NODE: //推送至节点 pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType); break; case RULE_CHAIN: //通知父 Actor parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType)); break; } } else { //放入队列,交给负责的服务 putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target); } }
注:集群部署下,每个服务负责若干分区
pushMsgToNode方法和putToQueue方法前面都已经看过了,重点看跳转规则链
和前面一样,查看RuleChainToRuleChainMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG
接着查看租户Actor中对应的处理方法
//org.thingsboard.server.actors.tenant.TenantActor private void onRuleChainMsg(RuleChainAwareMsg msg) { if (getApiUsageState().isReExecEnabled()) { getOrCreateActor(msg.getRuleChainId()).tell(msg); } }
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) { var msg = envelope.getMsg(); if (!checkMsgValid(msg)) { return; } try { checkComponentStateActive(envelope.getMsg()); if (firstNode != null) { pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType()); } else { envelope.getMsg().getCallback().onSuccess(); } } catch (RuleNodeException e) { log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId); } }
上一篇: 规则引擎解析