01Thingsboard中actor和RuleEngine的启动流程

2024-12-26
0 122

一、基本关系

1、单体服务
alt text
2、微服务
alt text
3、创建流程
alt text

Reference

二、DefaultActorService

DefaultActorService是整个规则引擎的初始化入口。

DefaultActorService会初始化一个使用actor模型的规则引擎,共分为2步:

  • ①创建actorSystem;
  • ②处理应用初始化完成事件

1、创建actorSystem

1、创建appActor
@PostConstruct
public void initActorSystem() {
    log.info("Initializing actor system.");
    actorContext.setActorService(this);
    TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
    system = new DefaultTbActorSystem(settings);

    system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
    system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
    system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
    system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

    actorContext.setActorSystem(system);

    // 创建appActor,全局唯一
    appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
    actorContext.setAppActor(appActor);

    TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
    actorContext.setStatsActor(statsActor);

    log.info("Actor system initialized.");
}

DefaultTbActorSystem.createActor

private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
    Dispatcher dispatcher = dispatchers.get(dispatcherId);
    if (dispatcher == null) {
        log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
        throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
    }

    TbActorId actorId = creator.createActorId();
    TbActorMailbox actorMailbox = actors.get(actorId);
    if (actorMailbox != null) {
        log.debug("Actor with id [{}] is already registered!", actorId);
    } else {
        Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
        actorCreationLock.lock();
        try {
            actorMailbox = actors.get(actorId);
            if (actorMailbox == null) {
                log.debug("Creating actor with id [{}]!", actorId);
                // appActor类型
                TbActor actor = creator.createActor();
                TbActorRef parentRef = null;
                if (parent != null) {
                    parentRef = getActor(parent);
                    if (parentRef == null) {
                        throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
                    }
                }
                TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
                actors.put(actorId, mailbox);
                // 最后会调用appActor.init方法
                mailbox.initActor();
                actorMailbox = mailbox;
                if (parent != null) {
                    parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
                }
            } else {
                log.debug("Actor with id [{}] is already registered!", actorId);
            }
        } finally {
            actorCreationLock.unlock();
            actorCreationLocks.remove(actorId);
        }
    }
    return actorMailbox;
}

appActor.init()

在初始化appActor的时候会启动一个定时任务,去定时清除掉过期的sessionInfo。后面在做扩展的时候需要注意,sessionInfo长时间不用可能会被清除。

public void init(TbActorCtx ctx) throws TbActorException {
    super.init(ctx);
    if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE)) {
        systemContext.schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(),
                systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
    }
}
2、创建stasActor

用于统计状态,创建过程与appActor相同。

== 注:appActor和stasActor全局唯一 ==

2、处理应用初始化完成事件

在应用初始化结束之后会接收到spring发送的ApplicationReadyEvent事件,会向appActor中发送一个AppInitMsg消息。然后appActor会为每一个租户初始化一个tanentActor和ruleChain、ruleNode。

@AfterStartUp(order = AfterStartUp.ACTOR_SYSTEM)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
    log.info("Received application ready event. Sending application init message to actor system");
    appActor.tellWithHighPriority(new AppInitMsg());
}

appActor.doProcess()

@Override
protected boolean doProcess(TbActorMsg msg) {
    log.info("app actor s msg s msg type {}", msg.getMsgType());
    if (!ruleChainsInitialized) {
        if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
            // 初始化租户的actor
            initTenantActors();
            ruleChainsInitialized = true;
        } else {
            if (!msg.getMsgType().isIgnoreOnStart()) {
                log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
            }
            return true;
        }
    }
    // ...
}

appActor.initTenantActors()

private void initTenantActors() {
    log.info("Starting main system actor.");
    try {
        if (systemContext.isTenantComponentsInitEnabled()) {
            // 查询所有租户
            PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
            for (Tenant tenant : tenantIterator) {
                log.debug("[{}] Creating tenant actor", tenant.getId());
                // 为每一个租户初始化一个actor
                getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> {
                    log.debug("[{}] Tenant actor created.", tenant.getId());
                }, () -> {
                    log.debug("[{}] Skipped actor creation", tenant.getId());
                });
            }
        }
        log.info("Main system actor started.");
    } catch (Exception e) {
        log.warn("Unknown failure", e);
    }
}

appActor.getOrCreateTenantActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。

传入的actorId是tanentId,ActorCreator是TanentActor.ActorCreator(会传入tenantId,后面初始化ruleChain时会用到)。

在DefaultTbActorSystem.createActor()方法中TbActorMailbox中的actor就是TenantActor.class类型,所以在mailbox.initActor();方法中会调用tenantActor.init()方法进行初始化。

tenantActor.init()

public void init(TbActorCtx ctx) throws TbActorException {
    super.init(ctx);
    log.debug("[{}] Starting tenant actor.", tenantId);
    try {
        Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
        if (tenant == null) {
            cantFindTenant = true;
            log.info("[{}] Started tenant actor for missing tenant.", tenantId);
        } else {
            isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
            isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
            if (isRuleEngine) {
                if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
                    try {
                        if (getApiUsageState().isReExecEnabled()) {
                            log.debug("[{}] Going to init rule chains", tenantId);
                            // 初始化ruleChain
                            initRuleChains();
                        } else {
                            log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
                        }
                    } catch (Exception e) {
                        log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
                        cantFindTenant = true;
                    }
                } else {
                    log.info("Tenant {} is not managed by current service, skipping rule chains init", tenantId);
                }
            }
            log.debug("[{}] Tenant actor started.", tenantId);
        }
    } catch (Exception e) {
        log.warn("[{}] Unknown failure", tenantId, e);
    }
}

在tenantActor初始化时会去初始化该租户下的ruleChain。
RuleChainManagerActor.initRuleChains()

protected void initRuleChains() {
    log.debug("[{}] Initializing rule chains", tenantId);
    // 查询该租户下所有的ruleChain
    for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
        RuleChainId ruleChainId = ruleChain.getId();
        log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
        // 初始化每一个ruleChain
        TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
        visit(ruleChain, actorRef);
        log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
    }
    ruleChainsInitialized = true;
}

RuleChainManagerActor.getOrCreateActor() -> TbActorMailbox.getOrCreateChildActor() -> DefaultTbActorSystem.createChildActor() -> DefaultTbActorSystem.createActor()。

DefaultTbActorSystem.createActor()

private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
    Dispatcher dispatcher = dispatchers.get(dispatcherId);
    if (dispatcher == null) {
        log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
        throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
    }

    TbActorId actorId = creator.createActorId();
    TbActorMailbox actorMailbox = actors.get(actorId);
    if (actorMailbox != null) {
        log.debug("Actor with id [{}] is already registered!", actorId);
    } else {
        Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
        actorCreationLock.lock();
        try {
            actorMailbox = actors.get(actorId);
            if (actorMailbox == null) {
                log.debug("Creating actor with id [{}]!", actorId);
                // RuleChainActor.class类型
                TbActor actor = creator.createActor();
                TbActorRef parentRef = null;
                if (parent != null) {
                    parentRef = getActor(parent);
                    if (parentRef == null) {
                        throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
                    }
                }
                TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
                actors.put(actorId, mailbox);
                // 会调用RuleChainActor.init()方法,但是RuleChainActor没有重写init方法,而RuleChainActor继承ComponentActor,所以会调用ComponentActor.init()方法
                mailbox.initActor();
                actorMailbox = mailbox;
                if (parent != null) {
                    parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
                }
            } else {
                log.debug("Actor with id [{}] is already registered!", actorId);
            }
        } finally {
            actorCreationLock.unlock();
            actorCreationLocks.remove(actorId);
        }
    }
    return actorMailbox;
}

ComponentActor.init()

@Override
public void init(TbActorCtx ctx) throws TbActorException {
    super.init(ctx);
    // createProcessor会返回一个RuleChainActorMessageProcessor类型
    this.processor = createProcessor(ctx);
    // 这个方法里面会去初始化ruleNode
    initProcessor(ctx);
}

RuleChainActor.createProcessor()

@Override
protected RuleChainActorMessageProcessor createProcessor(TbActorCtx ctx) {
    return new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext,
            ctx.getParentRef(), ctx);
}

ComponentActor.initProcessor()

protected void initProcessor(TbActorCtx ctx) throws TbActorException {
    try {
        log.debug("[{}][{}][{}] Starting processor.", tenantId, id, id.getEntityType());
        // 这个方法里面回去初始化ruleNode
        processor.start(ctx);
        logLifecycleEvent(ComponentLifecycleEvent.STARTED);
        if (systemContext.isStatisticsEnabled()) {
            scheduleStatsPersistTick();
        }
    } catch (Exception e) {
        log.debug("[{}][{}] Failed to start {} processor.", tenantId, id, id.getEntityType(), e);
        logAndPersist("OnStart", e, true);
        logLifecycleEvent(ComponentLifecycleEvent.STARTED, e);
        throw new TbActorException("Failed to init actor", e);
    }
}

RuleChainActorMessageProcessor.start()

@Override
public void start(TbActorCtx context) {
    if (!started) {
        RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
        if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {
            // 找到租户下的ruleChain
            List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
            log.debug("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
            // Creating and starting the actors;
            for (RuleNode ruleNode : ruleNodeList) {
                log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
                // 创建ruleNode
                TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
                // 缓存
                nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
            }
            initRoutes(ruleChain, ruleNodeList);
            started = true;
        }
    } else {
        onUpdate(context);
    }
}
  • createRuleNodeActor方法最终也会调用到DefaultTbActorSystem.createActor,而actor的类型是RuleNodeActor.class
  • 所以会调用ruleNodeActor.init方法,而ruleNodeActor继承自ComponentActor,所以也会调用到ComponentActor中的init方法,
  • 跟ruleChainActor一样,最后会调用到ruleNodeActor中的createProcessor方法,返回值是一个RuleNodeActorMessageProcessor类型,
  • ComponentActor.initProcessor中的start方法就会调用到RuleNodeActorMessageProcessor.start方法

RuleNodeActorMessageProcessor.start

@Override
public void start(TbActorCtx context) throws Exception {
    if (isMyNodePartition()) {
        log.debug("[{}][{}] Starting", tenantId, entityId);
        // 初始化node节点
        tbNode = initComponent(ruleNode);
        if (tbNode != null) {
            state = ComponentLifecycleState.ACTIVE;
        }
    }
}
private TbNode initComponent(RuleNode ruleNode) throws Exception {
    TbNode tbNode = null;
    if (ruleNode != null) {
        Class<?> componentClazz = Class.forName(ruleNode.getType());
        tbNode = (TbNode) (componentClazz.getDeclaredConstructor().newInstance());
        // 具体每个node的init方法
        tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));
    }
    return tbNode;
}

三、DeviceActor

deviceActor不是在server启动时创建的,而是当设备上来数据的时候才会去初始化。
具体是在TenantActor.onToDeviceActorMsg中。

    private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
        if (!isCore) {
            log.warn("RECEIVED INVALID MESSAGE: {}", msg);
        }
        if (deletedDevices.contains(msg.getDeviceId())) {
            log.debug("RECEIVED MESSAGE FOR DELETED DEVICE: {}", msg);
            return;
        }
        // 创建deviceActor
        TbActorRef deviceActor = getOrCreateDeviceActor(msg.getDeviceId());
        if (priority) {
            deviceActor.tellWithHighPriority(msg);
        } else {
            deviceActor.tell(msg);
        }
    }

四、代码调试思路

因为整个server中Actor是核心,所以就从actor入手。

public interface TbActor {

    boolean process(TbActorMsg msg);

    TbActorRef getActorRef();

    default void init(TbActorCtx ctx) throws TbActorException {
    }

    default void destroy(TbActorStopReason stopReason, Throwable cause) throws TbActorException {
    }

    default InitFailureStrategy onInitFailure(int attempt, Throwable t) {
        return InitFailureStrategy.retryWithDelay(5000L * attempt);
    }

    default ProcessFailureStrategy onProcessFailure(TbActorMsg msg, Throwable t) {
        if (t instanceof Error) {
            return ProcessFailureStrategy.stop();
        } else {
            return ProcessFailureStrategy.resume();
        }
    }
}

在TbActor中,很明显init是初始化的方法,肯定会在初始化的时候调用,找到其中的一个实现如appActor.init(),find useages找到调用appActor.init的地方,发现只有一个地方在调用:TbActorMailbox.tryInit,在沿着往上找,就可以找到DefaultActorService.initActorSystem()方法。