JAVA ThingsBoard安装与源码简析

大概有10几万行代码量,官方提供非常全的软件使用教程。
学习步骤:

  1. 大概了解软件架构,基本功能确定是否需要继续研究
  2. 搭建简单测试环境
  3. 看官方文档,按照文档进行操作
  4. 搭建开发环境
  5. 调试核心流程与功能,了解主要模块是如何设计实现的

查看演示效果

docker 安装简单执行:

1
2
3
4
mkdir -p ~/.mytb-data && sudo chown -R 799:799 ~/.mytb-data
mkdir -p ~/.mytb-logs && sudo chown -R 799:799 ~/.mytb-logs
docker run -it -p 8080:9090 -p 1883:1883 -p 5683:5683/udp -v ~/.mytb-data:/data \
-v ~/.mytb-logs:/var/log/thingsboard --name mytb --restart always thingsboard/tb-postgres

访问:http://localhost:8080进行登录
tenant@thingsboard.org/tenant
sysadmin@thingsboard.org/sysadmin

1
curl -v -X POST -d "{\"temperature\": 10}" http://192.168.33.123:8080/api/v1/ABC123/telemetry --header "Content-Type:application/json"

数据库安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
docker run \
-d \
--name postgres \
-p 5432:5432 \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgres123456 \
-v ~/docker_data/pgsql/data:/var/lib/postgresql/data \
postgres:11


docker exec -it postgres /bin/bash
psql -U postgres
create database test;

1
docker run -d -p9042:9042 --name cassandra --restart always  cassandra

编译运行代码

  1. 安装jdk11 idea maven nodejs12,
  2. 安装yarn,npm install -g yarn
  3. idea中 clean编译代码,全部Success
  4. 修改application→thingsboard.yml中数据库配置
  5. 把DAO模块的SQL拷贝到application模块data/sql下 执行ThingsboardInstallApplication
  6. node -v,npm -v,yarn -v全局修改项目中的node yarn版本号<nodeVersion>
  7. 可以注释掉license-maven-plugin
  8. maven install 可以跳过测试。
  9. 运行ThingsboardServerApplication
  10. 登录:sysadmin@thingsboard.org/sysadmin
  11. 创建账号比如:test@ak.com/test

问题 Thingsboard org.thingsboard.server.gen.*找不到解决方法

org.thingsboard.server.gen.transport.*是由protobuf自动生成的类,解决这个问题步骤:
1.IDEA是否安装potobuf插件
2.检查类是否成功生成
3.修改IDEA的filesize大小

安装:protobuf-jetbrains-plugin
https://github.com/ksprojects/protobuf-jetbrains-plugin/releases 下载release的插件,不是源码
在idea plugins标题栏的…中选择本地安装 install plugin from disk…

help > 编辑自定义属性,配置:idea.max.intellisense.filesize=9999

Error:java: 无效的标记: -Xlint:removal

升级JDK到11
https://jdk.java.net/archive/

代码分析

@PostConstruct, @EventListener观察者模式,@Builder,Guava ListenalbeFuture SettableFuture Futures.transform等

发送一个http请求的过程

1
curl -v -X POST -d "{\"temperature\": 10}" http://192.168.33.123:8080/api/v1/ABC123/telemetry --header "Content-Type:application/json"
  1. 进入DeviceApiController postTelemetry()方法

  2. DefaultTransportService doProcess DefaultTbQueueRequestTemplate send InMemoryTbQueueProducer send InMemoryStorage put (topic=tb_transport.api.requests) 然后调用callback

  3. 执行DefaultTbQueueResponseTemplate init() 中的获取请求的代码,通过callback发送返回信息到responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);

  4. DefaultTbQueueRequestTemplateresponse中获取信息,创建线程进行处理返回给前端httpclient或者mqttclient

  5. 对于MQTT主要关注MqttTransportService MqttTransportHandler
    请求进入 MqttTransportHandler channelRead()DefaultTransportService doProcess()DefaultTbQueueRequestTemplate send() 发往tb_transport.api.requests队列 → TbActorMailbox processMailbox() 启动时进行init 通过poll()获取消息 → ContextAwareActor process() 模板模式的模板方法 调用abstract doProcess由子类实现 → AppActor doProcess()根据类型转给不同Actor处理 调用TbActorMailbox tell() 里面的 enqueue() → 然后继续tryProcessQueue 继续processMailbox() 交由不同actor处理,RuleNodeActor doProcess() 规则有(RuleChainToRuleNodeMsg RuleChainToRuleNodeMsg)RuleNodeActor onRuleChainToRuleNodeMsg(){processor.onRuleChainToRuleNodeMsg(msg);} 然后 RuleNodeActorMessageProcessor onRuleChainToRuleNodeMsg() debug模式调用BaseEventService saveAsync()记录操作

  6. 启动的时候org.thingsboard.server.actors.service.DefaultActorService会进行Actor的初始化,会创建一个根Actor

  7. 初始化完成,DefaultActorService会往DefaultTbActorSystem的根Actor的Mail里面发送一条AppInitMsg消息。TbActorMailbox enqueue

    1
    2
    3
    4
    5
    6
    7
    8
    public class DefaultActorService extends TbApplicationEventListener<PartitionChangeEvent> implements ActorService {
    @EventListener(ApplicationReadyEvent.class)
    @Order(value = 2)
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
    log.info("Received application ready event. Sending application init message to actor system");
    appActor.tellWithHighPriority(new AppInitMsg()); // 有普通队列与优先队列
    }
    }
  8. AppActor会从它的Mail里面把消息拿出来,通过process处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class AppActor extends ContextAwareActor {
    @Override
    protected boolean doProcess(TbActorMsg msg) {
    if (!ruleChainsInitialized) { // 是否初始化
    initTenantActors(); // 初始化
    ruleChainsInitialized = true;
    if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
    log.warn("Rule Chains initialized by unexpected message: {}", msg);
    }
    }
    switch (msg.getMsgType()) {
    case APP_INIT_MSG:
    break;
    case PARTITION_CHANGE_MSG:
    ctx.broadcastToChildren(msg);
    break;
    case COMPONENT_LIFE_CYCLE_MSG:
    onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
    break;
    case QUEUE_TO_RULE_ENGINE_MSG:
    onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
    break;
    case TRANSPORT_TO_DEVICE_ACTOR_MSG:
    onToDeviceActorMsg((TenantAwareMsg) msg, false);
    break;
    case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
    case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
    case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
    case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
    case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
    onToDeviceActorMsg((TenantAwareMsg) msg, true);
    break;
    default:
    return false;
    }
    return true;
    }
    }
  9. 查询数据库加载TelentActor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    private void initTenantActors() {
    log.info("Starting main system actor.");
    try {
    // This Service may be started for specific tenant only.
    Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
    if (isolatedTenantId.isPresent()) {
    Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
    if (tenant != null) {
    log.debug("[{}] Creating tenant actor", tenant.getId());
    getOrCreateTenantActor(tenant.getId()); // get or create
    log.debug("Tenant actor created.");
    } else {
    log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
    }
    } else if (systemContext.isTenantComponentsInitEnabled()) {
    PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
    boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
    boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
    for (Tenant tenant : tenantIterator) {
    TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
    if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
    log.debug("[{}] Creating tenant actor", tenant.getId());
    getOrCreateTenantActor(tenant.getId());
    log.debug("[{}] Tenant actor created.", tenant.getId());
    }
    }
    }
    log.info("Main system actor started.");
    } catch (Exception e) {
    log.warn("Unknown failure", e);
    }
    }
  10. 初始化Actor,TbActorMailbox initActor tryInit

    1
    2
    3
    public void initActor() { 
    dispatcher.getExecutor().execute(() -> tryInit(1));
    }
  11. 初始化TenantActor的时候同时RuleChainActor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    public class TenantActor extends RuleChainManagerActor {
    @Override
    public void init(TbActorCtx ctx) throws TbActorException {
    super.init(ctx);
    log.info("[{}] Starting tenant actor.", tenantId);
    try {
    Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
    if (tenant == null) {
    cantFindTenant = true;
    log.info("[{}] Started tenant actor for missing tenant.", tenantId);
    } else {
    apiUsageState = new ApiUsageState(systemContext.getApiUsageStateService().getApiUsageState(tenant.getId()));

    // This Service may be started for specific tenant only.
    Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();

    TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId());

    isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
    isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
    if (isRuleEngineForCurrentTenant) {
    try {
    if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenantProfile.isIsolatedTbRuleEngine())) {
    if (apiUsageState.isReExecEnabled()) {
    log.info("[{}] Going to init rule chains", tenantId);
    initRuleChains(); // 初始化RuleChains
    } else {
    log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
    }
    } else {
    isRuleEngineForCurrentTenant = false;
    }
    } catch (Exception e) {
    cantFindTenant = true;
    }
    }
    log.info("[{}] Tenant actor started.", tenantId);
    }
    } catch (Exception e) {
    log.warn("[{}] Unknown failure", tenantId, e);
    }
    }
    }

    public abstract class RuleChainManagerActor extends ContextAwareActor {
    protected void initRuleChains() {
    for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
    RuleChainId ruleChainId = ruleChain.getId();
    log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
    TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
    visit(ruleChain, actorRef);
    log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
    }
    }

    protected void visit(RuleChain entity, TbActorRef actorRef) {
    if (entity != null && entity.isRoot()) {
    rootChain = entity;
    rootChainActor = actorRef; // rootChainActor
    }
    }
    }
  12. 创建RuleChainActorMessageProcessor负责消息的处理

    1
    2
    3
    4
    5
    6
    7
    8
    public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
    @Override
    public void init(TbActorCtx ctx) throws TbActorException {
    super.init(ctx);
    this.processor = createProcessor(ctx); //RuleChainActorMessageProcessor
    initProcessor(ctx);
    }
    }
  13. RuleChainActorMessageProcessor初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
    @Override
    public void start(TbActorCtx context) {
    if (!started) {
    RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
    if (ruleChain != null) {
    List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId); //查询roleNode
    log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
    // Creating and starting the actors;
    for (RuleNode ruleNode : ruleNodeList) {
    log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
    TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
    nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
    }
    initRoutes(ruleChain, ruleNodeList); // 初始化路由,进行关系对应
    started = true;
    }
    } else {
    onUpdate(context);
    }
    }

    }
  14. RuleNodeActorMessageProcessor 在RuleNodeActor init的时候创建

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
    abstract protected P createProcessor(TbActorCtx ctx);

    @Override
    public void init(TbActorCtx ctx) throws TbActorException {
    super.init(ctx);
    this.processor = createProcessor(ctx);
    initProcessor(ctx);
    }
    }
    public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
    @Override
    protected RuleNodeActorMessageProcessor createProcessor(TbActorCtx ctx) {
    return new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, ctx.getParentRef(), ctx);
    }
    }
    public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
    @Override
    public void start(TbActorCtx context) throws Exception {
    tbNode = initComponent(ruleNode);
    if (tbNode != null) {
    state = ComponentLifecycleState.ACTIVE;
    }
    }
    }
  15. 数据库操作

    • initActor tryInit 会调用 BaseEventService save 保存启动事件到数据库中
    • TbActorMailbox processMailbox actor.process(msg)… 最终会调用 BaseEventService saveAsync保存数据库
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      saveAsync:55, BaseEventService (org.thingsboard.server.dao.event)
      persistDebugAsync:489, ActorSystemContext (org.thingsboard.server.actors)
      persistDebugInput:447, ActorSystemContext (org.thingsboard.server.actors)
      onRuleChainToRuleNodeMsg:132, RuleNodeActorMessageProcessor (org.thingsboard.server.actors.ruleChain)
      onRuleChainToRuleNodeMsg:94, RuleNodeActor (org.thingsboard.server.actors.ruleChain)
      doProcess:60, RuleNodeActor (org.thingsboard.server.actors.ruleChain)
      process:45, ContextAwareActor (org.thingsboard.server.actors.service)
      processMailbox:141, TbActorMailbox (org.thingsboard.server.actors)
      run:-1, 1268144148 (org.thingsboard.server.actors.TbActorMailbox$$Lambda$1471)
      exec:1426, ForkJoinTask$RunnableExecuteAction (java.util.concurrent)
      doExec:290, ForkJoinTask (java.util.concurrent)
      topLevelExec:1020, ForkJoinPool$WorkQueue (java.util.concurrent)
      scan:1656, ForkJoinPool (java.util.concurrent)
      runWorker:1594, ForkJoinPool (java.util.concurrent)
      run:177, ForkJoinWorkerThread (java.util.concurrent)
  16. InMemoryTbQueueConsumer poll方法一直被调用,比如DefaultTbQueueResponseTemplate、DefaultTbRuleEngineConsumerService、DefaultTbQueueRequestTemplate等 ,storage内容如下:

    1
    2
    3
    4
    5
    6
    7
    "tb_core.4" -> {LinkedBlockingQueue@26304}  size = 0
    "tb_core.2" -> {LinkedBlockingQueue@26306} size = 0
    "tb_usage_stats.5" -> {LinkedBlockingQueue@26308} size = 1
    "tb_transport.api.responses.DESKTOP-EFHK05P" -> {LinkedBlockingQueue@26309} size = 0
    "tb_rule_engine.main.2" -> {LinkedBlockingQueue@26311} size = 0
    "tb_transport.notifications.DESKTOP-EFHK05P" -> {LinkedBlockingQueue@26313} size = 0
    "tb_transport.api.requests" -> {LinkedBlockingQueue@26314} size = 0

其他说明:
DefaultTbQueueRequestTemplate/InMemoryTbQueueProducer/InMemoryTbQueueProducer/InMemoryStorage 队列相关
TbServiceInfoProvider/PartitionService/DiscoveryService/DefaultTbCoreConsumerService/DefaultTbRuleEngineConsumerService
DefaultTransportService/TbCoreTransportApiService
DefaultActorService/RuleChainManagerActor/TbActorMailbox/ComponentActor
TbActorMailbox邮箱中进行处理
TbActorMailbox processMailbox 消息流处理,关键调用链
RuleNodeActor doProcess 处理这
DefaultTelemetrySubscriptionService saveAndNotify
BaseEventService public ListenableFuture saveAsync(Event event) 消息写入数据库
@EventListener(ApplicationReadyEvent.class) 各种启动入口

App Actor/Tenant Actor/Device Actor/Rule Chain Actor/Rule Node Actor

Actor模型进行线程交互需要了解springboot-akka