ThingsBoard源码解析-数据订阅与规则链数据处理

  • 作者: 凯哥Java(公众号:凯哥Java)
  • thingsboard
  • 时间:2023-05-04 18:11
  • 3924人已阅读
简介 前言结合本篇对规则链的执行过程进行探讨根据之前对MQTT源码的学习,我们由消息的处理入手//org.thingsboard.server.transport.mqtt.MqttTransportHandler void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage m

🔔🔔🔔好消息!好消息!🔔🔔🔔

有需要的朋友👉:联系凯哥 微信号 kaigejava2022

前言

结合本篇对规则链的执行过程进行探讨
根据之前对MQTT源码的学习,我们由消息的处理入手

//org.thingsboard.server.transport.mqtt.MqttTransportHandler
 
void processRegularSessionMsg(ChannelHandlerContext ctx, MqttMessage msg) {
	switch (msg.fixedHeader().messageType()) {
		case PUBLISH:
			processPublish(ctx, (MqttPublishMessage) msg);
			break;
		case SUBSCRIBE:
			processSubscribe(ctx, (MqttSubscribeMessage) msg);
			break;
		case UNSUBSCRIBE:
			processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
			break;
		case PINGREQ:
			if (checkConnected(ctx, msg)) {
				ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
				transportService.reportActivity(deviceSessionCtx.getSessionInfo());
			}
			break;
		case DISCONNECT:
			ctx.close();
			break;
		case PUBACK:
			int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
			TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
			if (rpcRequest != null) {
				transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
			}
			break;
		default:
			break;
        }
    }

接着看对发布消息的处理

//org.thingsboard.server.transport.mqtt.MqttTransportHandler
 
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) {
	if (!checkConnected(ctx, mqttMsg)) {
		return;
	}
	String topicName = mqttMsg.variableHeader().topicName();
	int msgId = mqttMsg.variableHeader().packetId();
	log.trace("[{}][{}] Processing publish msg [{}][{}]!", sessionId, deviceSessionCtx.getDeviceId(), topicName, msgId);
 
	if (topicName.startsWith(MqttTopics.BASE_GATEWAY_API_TOPIC)) {
		//消息来源为网关主题
		if (gatewaySessionHandler != null) {
			handleGatewayPublishMsg(ctx, topicName, msgId, mqttMsg);
			transportService.reportActivity(deviceSessionCtx.getSessionInfo());
		}
	} else {
		//处理设备的消息,重点
		processDevicePublish(ctx, mqttMsg, topicName, msgId);
	}
}

继续

//org.thingsboard.server.transport.mqtt.MqttTransportHandler
 
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
	try {
		Matcher fwMatcher;
		MqttTransportAdaptor payloadAdaptor = deviceSessionCtx.getPayloadAdaptor();
		if (deviceSessionCtx.isDeviceAttributesTopic(topicName)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (deviceSessionCtx.isDeviceTelemetryTopic(topicName)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V1;
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V1;
		} else if (topicName.equals(MqttTopics.DEVICE_CLAIM_TOPIC)) {
			TransportProtos.ClaimDeviceMsg claimDeviceMsg = payloadAdaptor.convertToClaimDevice(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(ctx, msgId, claimDeviceMsg));
		} else if ((fwMatcher = FW_REQUEST_PATTERN.matcher(topicName)).find()) {
			getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.FIRMWARE);
		} else if ((fwMatcher = SW_REQUEST_PATTERN.matcher(topicName)).find()) {
			getOtaPackageCallback(ctx, mqttMsg, msgId, fwMatcher, OtaPackageType.SOFTWARE);
		} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_TOPIC)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = payloadAdaptor.convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_JSON_TOPIC)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getJsonMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_SHORT_PROTO_TOPIC)) {
			TransportProtos.PostTelemetryMsg postTelemetryMsg = context.getProtoMqttAdaptor().convertToPostTelemetry(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(ctx, msgId, postTelemetryMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = payloadAdaptor.convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_JSON_TOPIC)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = context.getJsonMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_SHORT_PROTO_TOPIC)) {
			TransportProtos.PostAttributeMsg postAttributeMsg = context.getProtoMqttAdaptor().convertToPostAttributes(deviceSessionCtx, mqttMsg);
			transportService.process(deviceSessionCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(ctx, msgId, postAttributeMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getJsonMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_JSON_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = context.getProtoMqttAdaptor().convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_PROTO_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC)) {
			TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = payloadAdaptor.convertToDeviceRpcResponse(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_RESPONSE_SHORT_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(ctx, msgId, rpcResponseMsg));
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getJsonMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_JSON_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V2_JSON;
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = context.getProtoMqttAdaptor().convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_PROTO_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V2_PROTO;
		} else if (topicName.startsWith(MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC)) {
			TransportProtos.ToServerRpcRequestMsg rpcRequestMsg = payloadAdaptor.convertToServerRpcRequest(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_RPC_REQUESTS_SHORT_TOPIC);
			transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequestMsg, getPubAckCallback(ctx, msgId, rpcRequestMsg));
			toServerRpcSubTopicType = TopicType.V2;
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getJsonMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_JSON_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V2_JSON;
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = context.getProtoMqttAdaptor().convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_PROTO_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V2_PROTO;
		} else if (topicName.startsWith(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX)) {
			TransportProtos.GetAttributeRequestMsg getAttributeMsg = payloadAdaptor.convertToGetAttributes(deviceSessionCtx, mqttMsg, MqttTopics.DEVICE_ATTRIBUTES_REQUEST_SHORT_TOPIC_PREFIX);
			transportService.process(deviceSessionCtx.getSessionInfo(), getAttributeMsg, getPubAckCallback(ctx, msgId, getAttributeMsg));
			attrReqTopicType = TopicType.V2;
		} else {
			transportService.reportActivity(deviceSessionCtx.getSessionInfo());
			ack(ctx, msgId);
		}
	} catch (AdaptorException e) {
		log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
		log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
		ctx.close();
	}
}

这里根据消息来源主题的不同,进行对应的处理

顺着属性消息处理继续往下看

//org.thingsboard.server.common.transport.service.DefaultTransportService
 
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
	if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) {
		//更新活动时间
		reportActivityInternal(sessionInfo);
		TenantId tenantId = getTenantId(sessionInfo);
		DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
		//获取键值对
		JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
		//构造元数据
		TbMsgMetaData metaData = new TbMsgMetaData();
		metaData.putValue("deviceName", sessionInfo.getDeviceName());
		metaData.putValue("deviceType", sessionInfo.getDeviceType());
		metaData.putValue("notifyDevice", "false");
		CustomerId customerId = getCustomerId(sessionInfo);
		//发送至规则引擎
		sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, 
				new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
	}
}

继续

//org.thingsboard.server.common.transport.service.DefaultTransportService
 
private void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, CustomerId customerId, TransportProtos.SessionInfoProto sessionInfo, JsonObject json, 
							TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) {
	//创建设备配置标识
	DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
	//从缓存中获取设备配置
	DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
	RuleChainId ruleChainId;
	String queueName;
 
	if (deviceProfile == null) {
		//无设备配置,使用默认的规则链和队列名
		log.warn("[{}] Device profile is null!", deviceProfileId);
		ruleChainId = null;
		queueName = ServiceQueue.MAIN;
	} else {
		//获取规则链标识
		ruleChainId = deviceProfile.getDefaultRuleChainId();
		//获取队列名
		String defaultQueueName = deviceProfile.getDefaultQueueName();
		queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
	}
 
	//创建消息
	TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
	//发送至规则引擎
	sendToRuleEngine(tenantId, tbMsg, callback);
}

继续

//org.thingsboard.server.common.transport.service.DefaultTransportService
 
private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
	
	//获取分区信息
	TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());
	if (log.isTraceEnabled()) {
		log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
	}
	//创建消息
	ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg))
			.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
			.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
	//统计数增加
	ruleEngineProducerStats.incrementTotal();
	//统计相关回调
	StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats);
	//发送至规则消息引擎队列
	ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback);
}

接下来,我们需要去消费端查看后续的处理:‘

protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;

先找到TbQueueProducer的位置

506817d226cc10dd0dcc4b556f7b6314.png

接着从TbQueueConsumer入手

package org.thingsboard.server.queue;
 
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
 
import java.util.List;
import java.util.Set;
 
public interface TbQueueConsumer<T extends TbQueueMsg> {
 
    String getTopic();
 
    void subscribe();
 
    void subscribe(Set<TopicPartitionInfo> partitions);
 
    void unsubscribe();
 
    List<T> poll(long durationInMillis);
 
    void commit();
 
    boolean isStopped();
 
}

重点关注拉取方法List<T> poll(long durationInMillis);,看看在哪些地方被调用

09face09e2afd8eb43e63f52cb266df5.png

发现目标DefaultTbRuleEngineConsumerService

//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService
 
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
	updateCurrentThreadName(threadSuffix);
	while (!stopped && !consumer.isStopped()) {
		try {
			//拉取消息
			List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
			if (msgs.isEmpty()) {
				continue;
			}
			//获取提交策略
			final TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(configuration);
			//获取确认策略
			final TbRuleEngineProcessingStrategy ackStrategy = getAckStrategy(configuration);
			//初始化提交策略
			submitStrategy.init(msgs);
			while (!stopped) {
				//创建处理上下文
				TbMsgPackProcessingContext ctx = new TbMsgPackProcessingContext(configuration.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs());
				//提交,重点为 submitMessage 方法
				submitStrategy.submitAttempt((id, msg) -> submitExecutor.submit(() -> submitMessage(configuration, stats, ctx, id, msg)));
 
				//超时等待
				final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
 
				//创建结果
				TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
				if (timeout) {
					//超时处理
					printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
				}
				if (!ctx.getFailedMap().isEmpty()) {
					//失败处理
					printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed");
				}
				//打印统计信息
				ctx.printProfilerStats();
 
				//根据结果获取决策
				TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result);
				if (statsEnabled) {
					//记录是否提交完成
					stats.log(result, decision.isCommit());
				}
 
				//清理上下文
				ctx.cleanup();
 
				//判断决策
				if (decision.isCommit()) {
					//已提交
					//停止提交策略
					submitStrategy.stop();
					//退出循环
					break;
				} else {
					//未提交完毕
					//将决策的重试消息集合更新至提交策略,继续提交
					submitStrategy.update(decision.getReprocessMap());
				}
			}
			//消费端提交确认
			consumer.commit();
		} catch (Exception e) {
			if (!stopped) {
				log.warn("Failed to process messages from queue.", e);
				try {
					Thread.sleep(pollDuration);
				} catch (InterruptedException e2) {
					log.trace("Failed to wait until the server has capacity to handle new requests", e2);
				}
			}
		}
	}
	log.info("TB Rule Engine Consumer stopped.");
}

查看关键方法

//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService
 
void submitMessage(TbRuleEngineQueueConfiguration configuration, TbRuleEngineConsumerStats stats, TbMsgPackProcessingContext ctx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) {
	log.trace("[{}] Creating callback for topic {} message: {}", id, configuration.getName(), msg.getValue());
	//获取原始消息
	ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
	//获取租户标识
	TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
	//创建回调
	TbMsgCallback callback = prometheusStatsEnabled ? 
			new TbMsgPackCallback(id, tenantId, ctx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : 
			new TbMsgPackCallback(id, tenantId, ctx);
	try {
		if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
			//转发至规则引擎 Actor
			forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
		} else {
			//消息为空直接回调成功方法
			callback.onSuccess();
		}
	} catch (Exception e) {
		//回调失败方法
		callback.onFailure(new RuleEngineException(e.getMessage()));
	}
}

继续

//org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService
 
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
	//构建消息
	TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
	QueueToRuleEngineMsg msg;
	//获取关联类型列表
	ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
	Set<String> relationTypes = null;
	if (relationTypesList != null) {
		if (relationTypesList.size() == 1) {
			relationTypes = Collections.singleton(relationTypesList.get(0));
		} else {
			relationTypes = new HashSet<>(relationTypesList);
		}
	}
	//创建消息
	msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
	//使用 Actor 系统上下文发送消息
	actorContext.tell(msg);
}

先看一下QueueToRuleEngineMsg

package org.thingsboard.server.common.msg.queue;
 
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
 
import java.util.Set;
 
/**
 * Created by ashvayka on 15.03.18.
 */
@ToString
@EqualsAndHashCode(callSuper = true)
public final class QueueToRuleEngineMsg extends TbRuleEngineActorMsg {
 
    @Getter
    private final TenantId tenantId;
    @Getter
    private final Set<String> relationTypes;
    @Getter
    private final String failureMessage;
 
    public QueueToRuleEngineMsg(TenantId tenantId, TbMsg tbMsg, Set<String> relationTypes, String failureMessage) {
        super(tbMsg);
        this.tenantId = tenantId;
        this.relationTypes = relationTypes;
        this.failureMessage = failureMessage;
    }
 
    @Override
    public MsgType getMsgType() {
        return MsgType.QUEUE_TO_RULE_ENGINE_MSG;
    }
 
    @Override
    public void onTbActorStopped(TbActorStopReason reason) {
        String message;
        if (msg.getRuleChainId() != null) {
            message = reason == TbActorStopReason.STOPPED ?
                    String.format("Rule chain [%s] stopped", msg.getRuleChainId().getId()) :
                    String.format("Failed to initialize rule chain [%s]!", msg.getRuleChainId().getId());
        } else {
            message = reason == TbActorStopReason.STOPPED ? "Rule chain stopped" : "Failed to initialize rule chain!";
        }
        msg.getCallback().onFailure(new RuleEngineException(message));
    }
 
    public boolean isTellNext() {
        return relationTypes != null && !relationTypes.isEmpty();
    }
 
}

得知消息类型为MsgType.QUEUE_TO_RULE_ENGINE_MSG,后面会用到

接着我们看Actor系统对消息的处理

//org.thingsboard.server.actors.ActorSystemContext
 
public void tell(TbActorMsg tbActorMsg) {
	appActor.tell(tbActorMsg);
}

appActor 为应用Actor,是整个Actor系统的根Actor,感兴趣可以自行阅读

根据之前的学习,我们了解到Actor的处理方法为boolean process(TbActor

//org.thingsboard.server.actors.service.ContextAwareActor
 
@Override
public boolean process(TbActorMsg msg) {
	if (log.isDebugEnabled()) {
		log.debug("Processing msg: {}", msg);
	}
	//处理消息
	if (!doProcess(msg)) {
		log.warn("Unprocessed message: {}!", msg);
	}
	return false;
}

可见,正真执行的方法为protected abstract boolean doProcess(TbActorMsg msg)

//org.thingsboard.server.actors.app.AppActor
 
@Override
protected boolean doProcess(TbActorMsg msg) {
	if (!ruleChainsInitialized) {
		//规则链未初始化
		//初始化租户 Actor
		initTenantActors();
		ruleChainsInitialized = true;
		if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
			log.warn("Rule Chains initialized by unexpected message: {}", msg);
		}
	}
	//判断消息类型
	switch (msg.getMsgType()) {
		case APP_INIT_MSG:
			break;
		case PARTITION_CHANGE_MSG:
			ctx.broadcastToChildren(msg);
			break;
		case COMPONENT_LIFE_CYCLE_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case QUEUE_TO_RULE_ENGINE_MSG:
			onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
			break;
		case TRANSPORT_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((TenantAwareMsg) msg, false);
			break;
		case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((TenantAwareMsg) msg, true);
			break;
		case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
			onToTenantActorMsg((EdgeEventUpdateMsg) msg);
			break;
		case SESSION_TIMEOUT_MSG:
			ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
			break;
		default:
			return false;
	}
	return true;
}

回忆之前消息类型为MsgType.QUEUE_TO_RULE_ENGIN

//org.thingsboard.server.actors.app.AppActor
 
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
	if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
		//消息来自系统,视为异常
		msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
	} else {
		if (!deletedTenants.contains(msg.getTenantId())) {
			//租户未被删除
			//获取或创建租户 Actor 并发送消息
			getOrCreateTenantActor(msg.getTenantId()).tell(msg);
		} else {
			//租户已删除,直接回调成功方法
			msg.getMsg().getCallback().onSuccess();
		}
	}
}

deletedTenants 用于记录删除的用户标识

//org.thingsboard.server.actors.app.AppActor
 
private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
	return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), 
			() -> DefaultActorService.TENANT_DISPATCHER_NAME, 
			() -> new TenantActor.ActorCreator(systemContext, tenantId));
}

直接查看TenantActor的doProcess方法

//org.thingsboard.server.actors.tenant.TenantActor
 
@Override
protected boolean doProcess(TbActorMsg msg) {
	if (cantFindTenant) {
		//找不到租户
		log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
		if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
			QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
			//直接回调成功方法
			queueMsg.getMsg().getCallback().onSuccess();
		} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
			TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
			//直接回调成功方法
			transportMsg.getCallback().onSuccess();
		}
		return true;
	}
	switch (msg.getMsgType()) {
		case PARTITION_CHANGE_MSG:
			PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;
			ServiceType serviceType = partitionChangeMsg.getServiceQueueKey().getServiceType();
			if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
				//To Rule Chain Actors
				broadcast(msg);
			} else if (ServiceType.TB_CORE.equals(serviceType)) {
				List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {
					@Override
					protected boolean testEntityId(EntityId entityId) {
						return super.testEntityId(entityId) && !isMyPartition(entityId);
					}
				});
				deviceActorIds.forEach(id -> ctx.stop(id));
			}
			break;
		case COMPONENT_LIFE_CYCLE_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case QUEUE_TO_RULE_ENGINE_MSG:
			onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
			break;
		case TRANSPORT_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((DeviceAwareMsg) msg, false);
			break;
		case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
		case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
		case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
			onToDeviceActorMsg((DeviceAwareMsg) msg, true);
			break;
		case SESSION_TIMEOUT_MSG:
			ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);
			break;
		case RULE_CHAIN_INPUT_MSG:
		case RULE_CHAIN_OUTPUT_MSG:
		case RULE_CHAIN_TO_RULE_CHAIN_MSG:
			onRuleChainMsg((RuleChainAwareMsg) msg);
			break;
		case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
			onToEdgeSessionMsg((EdgeEventUpdateMsg) msg);
			break;
		default:
			return false;
	}
	return true;
}

查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方法

//org.thingsboard.server.actors.tenant.TenantActor
 
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
	//检查当前服务是否为规则引擎服务
	if (!isRuleEngine) {
		log.warn("RECEIVED INVALID MESSAGE: {}", msg);
		return;
	}
	TbMsg tbMsg = msg.getMsg();
	//检查规则引擎是否启用状态
	if (getApiUsageState().isReExecEnabled()) {
		//判断规则链标识是否为空
		if (tbMsg.getRuleChainId() == null) {
			//获取根链 Actor 并判断是否为空
			if (getRootChainActor() != null) {
				//向根链 Actor 发送消息
				getRootChainActor().tell(msg);
			} else {
				//无根链 Actor ,回调失败方法
				tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
				log.info("[{}] No Root Chain: {}", tenantId, msg);
			}
		} else {
			try {
				//向指定规则链发送消息
				ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
			} catch (TbActorNotRegisteredException ex) {
				log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
				//TODO: 3.1 Log it to dead letters queue;
				tbMsg.getCallback().onSuccess();
			}
		}
	} else {
		log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
		tbMsg.getCallback().onSuccess();
	}
}

跳过中间的步骤,直接看规则链Actor的doProcess方法即可

//org.thingsboard.server.actors.ruleChain.RuleChainActor
 
@Override
protected boolean doProcess(TbActorMsg msg) {
	switch (msg.getMsgType()) {
		case COMPONENT_LIFE_CYCLE_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case QUEUE_TO_RULE_ENGINE_MSG:
			processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
			break;
		case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
			processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
			break;
		case RULE_CHAIN_TO_RULE_CHAIN_MSG:
			processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
			break;
		case RULE_CHAIN_INPUT_MSG:
			processor.onRuleChainInputMsg((RuleChainInputMsg) msg);
			break;
		case RULE_CHAIN_OUTPUT_MSG:
			processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg);
			break;
		case PARTITION_CHANGE_MSG:
			processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
			break;
		case STATS_PERSIST_TICK_MSG:
			onStatsPersistTick(id);
			break;
		default:
			return false;
	}
	return true;
}

查看MsgType.QUEUE_TO_RULE_ENGINE_MSG类型的处理方

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
	TbMsg msg = envelope.getMsg();
	//验证消息
	if (!checkMsgValid(msg)) {
		return;
	}
	log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
	//判断是否包含关联类型
	if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
		onTellNext(msg, true);
	} else {
		onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
	}
}

此处的关联类型是什么?

1a06de03f7f0bbf5d304471d471f5fa0.png


结合规则链的结构联想一下,规则链本质是从上个节点传递到下个节点,节点传递之间有什么是可有可无的?

显然关联类型就是节点流转的条件

由于首节点是没有条件的,因此在构造消息时没有设置关联类型

注:每条规则链仅有一个首节点,且除首节点外的其他节点至少存在一个关联类型(TbRelationTypes.FAILURE),这部分感兴趣自行研究

先看没有关联类型的处理

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {
	try {
		//检查组件(节点)状态是否正常
		checkComponentStateActive(msg);
		//获取节点标识
		RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;
		RuleNodeCtx targetCtx;
		if (targetId == null) {
			//未指定目标节点
			//将当前规则链的首节点作为目标
			targetCtx = firstNode;
			//拷贝消息,entityId即当前规则链的标识
			msg = msg.copyWithRuleChainId(entityId);
		} else {
			//已指定目标节点,获取节点上下文
			targetCtx = nodeActors.get(targetId);
		}
		//判断上下文是否存在
		if (targetCtx != null) {
			log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
			//推送至节点
			pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);
		} else {
			log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
			msg.getCallback().onSuccess();
		}
	} catch (RuleNodeException rne) {
		msg.getCallback().onFailure(rne);
	} catch (Exception e) {
		msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));
	}
}

查看下一个方法前,我们先了解一下节点上下文的结构

package org.thingsboard.server.actors.ruleChain;
 
import lombok.AllArgsConstructor;
import lombok.Data;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleNode;
 
/**
 * Created by ashvayka on 19.03.18.
 */
@Data
@AllArgsConstructor
final class RuleNodeCtx {
    private final TenantId tenantId;
    private final TbActorRef chainActor;
    private final TbActorRef selfActor;
    private RuleNode self;
}

很简单的结构,记录了租户标识,规则链Actor,自身节点Actor和自身节点

继续

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
	if (nodeCtx != null) {
		//创建消息并告知自身节点
		nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
	} else {
		log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
		msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));
	}
}

与QueueToRuleEngineMsg类似,查看可知RuleChainToRuleNodeMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_MSG,后面同样以此为处理条件

终于到节点的Actor了

//org.thingsboard.server.actors.ruleChain.RuleNodeActor
 
@Override
protected boolean doProcess(TbActorMsg msg) {
	switch (msg.getMsgType()) {
		case COMPONENT_LIFE_CYCLE_MSG:
		case RULE_NODE_UPDATED_MSG:
			onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
			break;
		case RULE_CHAIN_TO_RULE_MSG:
			onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
			break;
		case RULE_TO_SELF_MSG:
			onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);
			break;
		case STATS_PERSIST_TICK_MSG:
			onStatsPersistTick(id);
			break;
		case PARTITION_CHANGE_MSG:
			onClusterEventMsg((PartitionChangeMsg) msg);
			break;
		default:
			return false;
	}
	return true;
}

查看MsgType.RULE_CHAIN_TO_RULE_MSG处理方法

//org.thingsboard.server.actors.ruleChain.RuleNodeActor
 
private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg envelope) {
	TbMsg msg = envelope.getMsg();
	//验证消息
	if (!msg.isValid()) {
		if (log.isTraceEnabled()) {
			log.trace("Skip processing of message: {} because it is no longer valid!", msg);
		}
		return;
	}
	if (log.isDebugEnabled()) {
		log.debug("[{}][{}][{}] Going to process rule engine msg: {}", ruleChainId, id, processor.getComponentName(), msg);
	}
	try {
		//处理消息
		processor.onRuleChainToRuleNodeMsg(envelope);
		//增加处理计数
		increaseMessagesProcessedCount();
	} catch (Exception e) {
		logAndPersist("onRuleMsg", e);
	}
}

继续

//org.thingsboard.server.actors.ruleChain.RuleNodeActorMessageProcessor
 
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
	//回调处理开始通知
	msg.getMsg().getCallback().onProcessingStart(info);
	//检查组件状态可用
	checkComponentStateActive(msg.getMsg());
	TbMsg tbMsg = msg.getMsg();
	//获取规则节点计数
	int ruleNodeCount = tbMsg.getAndIncrementRuleNodeCounter();
	//获取消息的最大规则节点执行次数
	int maxRuleNodeExecutionsPerMessage = getTenantProfileConfiguration().getMaxRuleNodeExecsPerMessage();
	//判断执行次数是否超限
	if (maxRuleNodeExecutionsPerMessage == 0 || ruleNodeCount < maxRuleNodeExecutionsPerMessage) {
		//上报规则引擎执行计数
		apiUsageClient.report(tenantId, tbMsg.getCustomerId(), ApiUsageRecordKey.RE_EXEC_COUNT);
		if (ruleNode.isDebugMode()) {
			systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
		}
		try {
			//执行节点处理方法
			tbNode.onMsg(msg.getCtx(), msg.getMsg());
		} catch (Exception e) {
			msg.getCtx().tellFailure(msg.getMsg(), e);
		}
	} else {
		tbMsg.getCallback().onFailure(new RuleNodeException("Message is processed by more then " + maxRuleNodeExecutionsPerMessage + " rule nodes!", ruleChainName, ruleNode));
	}
}

这里仅调用了tbNode的onMsg方法,那么节点的流转呢?

猜想所有节点继承自一个公共的父抽象类,该类中实现了节点的流转

那么看一下继承关系

199affb3b332d123b05ece9a29407f77.png

这些节点并没由继承公共的父类,流转方法没有抽离出来?

随便找一个节点看看,这里我看的是TbMsgTypeFilterNode

//org.thingsboard.rule.engine.filter.TbMsgTypeFilterNode
 
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
	ctx.tellNext(msg, config.getMessageTypes().contains(msg.getType()) ? "True" : "False");
}

验证了猜想,接着看下去

//org.thingsboard.server.actors.ruleChain.DefaultTbContext
 
@Override
public void tellSuccess(TbMsg msg) {
	tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
}
 
@Override
public void tellNext(TbMsg msg, String relationType) {
	tellNext(msg, Collections.singleton(relationType), null);
}
 
@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
	tellNext(msg, relationTypes, null);
}

这里贴出了tellSuccess方法和另一个tellNext方法,它们有被其他节点使用

//org.thingsboard.server.actors.ruleChain.DefaultTbContext
 
private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
	if (nodeCtx.getSelf().isDebugMode()) {
		relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
	}
	//回调处理结束通知
	msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
	//像规则链 Actor 发送消息
	nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}

和前面一样,查看RuleNodeToRuleChainTellNextMsg的消息类型为MsgType.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG

接下来又回到了规则链Actor的doProcess方法,找到对应的处理方法

//org.thingsboard.server.actors.ruleChain.RuleChainActor
 
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
	processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
	break;
//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
	var msg = envelope.getMsg();
	if (checkMsgValid(msg)) {
		onTellNext(msg, envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage());
	}
}

这里调用的onTellNext方法即前面onQueueToRuleEngineMsg方法中根据关联类型通知的方法

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
	try {
		checkComponentStateActive(msg);
		EntityId entityId = msg.getOriginator();
		//获取主题分区信息
		TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
 
		//根据来源节点标识获取关联(后续节点的指向)列表
		List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);
		if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore
			log.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId());
			ruleNodeRelations = Collections.emptyList();
		}
 
		//根据指定的关联类型筛选关联
		List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream()
				.filter(r -> contains(relationTypes, r.getType()))
				.collect(Collectors.toList());
		//获取关联个数
		int relationsCount = relationsByTypes.size();
		if (relationsCount == 0) {
			//没有后续节点了
			log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
			//判断当前关联是否包含失败(上个节点是否执行失败)
			if (relationTypes.contains(TbRelationTypes.FAILURE)) {
				//获取规则节点上下文
				RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
				if (ruleNodeCtx != null) {
					//回调消息的失败方法
					msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
				} else {
					log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
					//回调消息的失败方法
					msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
				}
			} else {
				//回调消息的成功方法
				msg.getCallback().onSuccess();
			}
		} else if (relationsCount == 1) {
			//后续仅一个节点
			//此处循环仅执行一次
			for (RuleNodeRelation relation : relationsByTypes) {
				log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
				//推送给目标
				pushToTarget(tpi, msg, relation.getOut(), relation.getType());
			}
		} else {
			//后续多个节点
			MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
			log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes);
			//遍历关联列表
			for (RuleNodeRelation relation : relationsByTypes) {
				EntityId target = relation.getOut();
				//推送至队列
				putToQueue(tpi, msg, callbackWrapper, target);
			}
		}
	} catch (RuleNodeException rne) {
		msg.getCallback().onFailure(rne);
	} catch (Exception e) {
		log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);
		msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
	}
}

我们先看后续多个节点的推送

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) {
	switch (target.getEntityType()) {
		case RULE_NODE:
			putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper);
			break;
		case RULE_CHAIN:
			putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper);
			break;
	}
}

根据实体类型拷贝消息,发送至队列

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
private void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) {
	//构建消息
	ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder()
			.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
			.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
			.setTbMsg(TbMsg.toByteString(newMsg))
			.build();
	//推送至规则引擎
	clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
}

继续:

//org.thingsboard.server.service.queue.DefaultTbClusterService
 
@Override
public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) {
	log.trace("PUSHING msg: {} to:{}", msg, tpi);
	//获取规则引擎消息生产端,发送消息
	producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
	toRuleEngineMsgs.incrementAndGet();
}

消息发送至队列,消费端接收到消息后,根据关联类型再次调用onTellNext方法处理,此时的消息仅指向单个后续节点

最后查看单个后续节点的处理

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
	//判断是否为当前服务负责的分区
	if (tpi.isMyPartition()) {
		//判断实体类型
		switch (target.getEntityType()) {
			case RULE_NODE:
				//推送至节点
				pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
				break;
			case RULE_CHAIN:
				//通知父 Actor
				parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
				break;
		}
	} else {
		//放入队列,交给负责的服务
		putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
	}
}

注:集群部署下,每个服务负责若干分区

pushMsgToNode方法和putToQueue方法前面都已经看过了,重点看跳转规则链

和前面一样,查看RuleChainToRuleChainMsg的消息类型为MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG

接着查看租户Actor中对应的处理方法

//org.thingsboard.server.actors.tenant.TenantActor
 
private void onRuleChainMsg(RuleChainAwareMsg msg) {
	if (getApiUsageState().isReExecEnabled()) {
		getOrCreateActor(msg.getRuleChainId()).tell(msg);
	}
}

再查看规则链Actor中对应的处理方法

//org.thingsboard.server.actors.ruleChain.RuleChainActorMessageProcessor
 
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
	var msg = envelope.getMsg();
	if (!checkMsgValid(msg)) {
		return;
	}
	try {
		checkComponentStateActive(envelope.getMsg());
		if (firstNode != null) {
			pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
		} else {
			envelope.getMsg().getCallback().onSuccess();
		}
	} catch (RuleNodeException e) {
		log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
	}
}

pushMsgToNode方法前面已经看过了,至此,我们已阅读完整个规则链的执行逻辑

510e8a53c66083a8d437b5291118e5d5.png

TopTop