大概有10几万行代码量,官方提供非常全的软件使用教程。
学习步骤:
- 大概了解软件架构,基本功能确定是否需要继续研究
- 搭建简单测试环境
- 看官方文档,按照文档进行操作
- 搭建开发环境
- 调试核心流程与功能,了解主要模块是如何设计实现的
查看演示效果
docker 安装简单执行:
1 | mkdir -p ~/.mytb-data && sudo chown -R 799:799 ~/.mytb-data |
访问:http://localhost:8080
进行登录tenant@thingsboard.org
/tenant
sysadmin@thingsboard.org
/sysadmin
1 | curl -v -X POST -d "{\"temperature\": 10}" http://192.168.33.123:8080/api/v1/ABC123/telemetry --header "Content-Type:application/json" |
数据库安装
1 | docker run \ |
1 | docker run -d -p9042:9042 --name cassandra --restart always cassandra |
编译运行代码
- 安装jdk11 idea maven nodejs12,
- 安装yarn,npm install -g yarn
- idea中 clean编译代码,全部Success
- 修改application→thingsboard.yml中数据库配置
- 把DAO模块的SQL拷贝到application模块data/sql下 执行ThingsboardInstallApplication
- node -v,npm -v,yarn -v全局修改项目中的node yarn版本号
<nodeVersion>
- 可以注释掉license-maven-plugin
- maven install 可以跳过测试。
- 运行ThingsboardServerApplication
- 登录:sysadmin@thingsboard.org/sysadmin
- 创建账号比如:test@ak.com/test
问题 Thingsboard org.thingsboard.server.gen.*找不到解决方法
org.thingsboard.server.gen.transport.*是由protobuf自动生成的类,解决这个问题步骤:
1.IDEA是否安装potobuf插件
2.检查类是否成功生成
3.修改IDEA的filesize大小
安装:protobuf-jetbrains-plugin
https://github.com/ksprojects/protobuf-jetbrains-plugin/releases 下载release的插件,不是源码
在idea plugins标题栏的…中选择本地安装 install plugin from disk…
help > 编辑自定义属性,配置:idea.max.intellisense.filesize=9999
Error:java: 无效的标记: -Xlint:removal
升级JDK到11
https://jdk.java.net/archive/
代码分析
@PostConstruct
, @EventListener
观察者模式,@Builder
,Guava
ListenalbeFuture SettableFuture Futures.transform等
发送一个http请求的过程
1 | curl -v -X POST -d "{\"temperature\": 10}" http://192.168.33.123:8080/api/v1/ABC123/telemetry --header "Content-Type:application/json" |
进入
DeviceApiController postTelemetry()
方法DefaultTransportService doProcess
DefaultTbQueueRequestTemplate send
InMemoryTbQueueProducer send
InMemoryStorage put
(topic=tb_transport.api.requests) 然后调用callback执行
DefaultTbQueueResponseTemplate init()
中的获取请求的代码,通过callback发送返回信息到responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
DefaultTbQueueRequestTemplate
从response
中获取信息,创建线程进行处理返回给前端httpclient或者mqttclient对于MQTT主要关注
MqttTransportService
MqttTransportHandler
请求进入MqttTransportHandler channelRead()
→DefaultTransportService doProcess()
→DefaultTbQueueRequestTemplate send()
发往tb_transport.api.requests队列 → TbActorMailbox processMailbox() 启动时进行init 通过poll()获取消息 → ContextAwareActor process() 模板模式的模板方法 调用abstract doProcess由子类实现 →AppActor doProcess()
根据类型转给不同Actor处理 调用TbActorMailbox tell() 里面的 enqueue() → 然后继续tryProcessQueue 继续processMailbox() 交由不同actor处理,RuleNodeActor doProcess() 规则有(RuleChainToRuleNodeMsg RuleChainToRuleNodeMsg)
→RuleNodeActor onRuleChainToRuleNodeMsg(){processor.onRuleChainToRuleNodeMsg(msg);}
然后RuleNodeActorMessageProcessor onRuleChainToRuleNodeMsg()
debug模式调用BaseEventService saveAsync()记录操作启动的时候
org.thingsboard.server.actors.service.DefaultActorService
会进行Actor的初始化,会创建一个根Actor初始化完成,DefaultActorService会往DefaultTbActorSystem的根Actor的Mail里面发送一条AppInitMsg消息。TbActorMailbox enqueue
1
2
3
4
5
6
7
8public class DefaultActorService extends TbApplicationEventListener<PartitionChangeEvent> implements ActorService {
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
log.info("Received application ready event. Sending application init message to actor system");
appActor.tellWithHighPriority(new AppInitMsg()); // 有普通队列与优先队列
}
}AppActor会从它的Mail里面把消息拿出来,通过process处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class AppActor extends ContextAwareActor {
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) { // 是否初始化
initTenantActors(); // 初始化
ruleChainsInitialized = true;
if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
log.warn("Rule Chains initialized by unexpected message: {}", msg);
}
}
switch (msg.getMsgType()) {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, false);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, true);
break;
default:
return false;
}
return true;
}
}查询数据库加载TelentActor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32private void initTenantActors() {
log.info("Starting main system actor.");
try {
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
if (isolatedTenantId.isPresent()) {
Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
if (tenant != null) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId()); // get or create
log.debug("Tenant actor created.");
} else {
log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
}
} else if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
for (Tenant tenant : tenantIterator) {
TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
}
}
}
log.info("Main system actor started.");
} catch (Exception e) {
log.warn("Unknown failure", e);
}
}初始化Actor,TbActorMailbox initActor tryInit
1
2
3public void initActor() {
dispatcher.getExecutor().execute(() -> tryInit(1));
}初始化TenantActor的时候同时RuleChainActor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public class TenantActor extends RuleChainManagerActor {
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.info("[{}] Starting tenant actor.", tenantId);
try {
Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
if (tenant == null) {
cantFindTenant = true;
log.info("[{}] Started tenant actor for missing tenant.", tenantId);
} else {
apiUsageState = new ApiUsageState(systemContext.getApiUsageStateService().getApiUsageState(tenant.getId()));
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId());
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
if (isRuleEngineForCurrentTenant) {
try {
if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenantProfile.isIsolatedTbRuleEngine())) {
if (apiUsageState.isReExecEnabled()) {
log.info("[{}] Going to init rule chains", tenantId);
initRuleChains(); // 初始化RuleChains
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
}
} else {
isRuleEngineForCurrentTenant = false;
}
} catch (Exception e) {
cantFindTenant = true;
}
}
log.info("[{}] Tenant actor started.", tenantId);
}
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
}
}
}
public abstract class RuleChainManagerActor extends ContextAwareActor {
protected void initRuleChains() {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
}
protected void visit(RuleChain entity, TbActorRef actorRef) {
if (entity != null && entity.isRoot()) {
rootChain = entity;
rootChainActor = actorRef; // rootChainActor
}
}
}创建RuleChainActorMessageProcessor负责消息的处理
1
2
3
4
5
6
7
8public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
this.processor = createProcessor(ctx); //RuleChainActorMessageProcessor
initProcessor(ctx);
}
}RuleChainActorMessageProcessor初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
public void start(TbActorCtx context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null) {
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId); //查询roleNode
log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList); // 初始化路由,进行关系对应
started = true;
}
} else {
onUpdate(context);
}
}
}RuleNodeActorMessageProcessor 在RuleNodeActor init的时候创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgProcessor<T>> extends ContextAwareActor {
abstract protected P createProcessor(TbActorCtx ctx);
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
this.processor = createProcessor(ctx);
initProcessor(ctx);
}
}
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
protected RuleNodeActorMessageProcessor createProcessor(TbActorCtx ctx) {
return new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, ctx.getParentRef(), ctx);
}
}
public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNodeId> {
public void start(TbActorCtx context) throws Exception {
tbNode = initComponent(ruleNode);
if (tbNode != null) {
state = ComponentLifecycleState.ACTIVE;
}
}
}数据库操作
- initActor tryInit 会调用 BaseEventService save 保存启动事件到数据库中
- TbActorMailbox processMailbox actor.process(msg)… 最终会调用 BaseEventService saveAsync保存数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15saveAsync:55, BaseEventService (org.thingsboard.server.dao.event)
persistDebugAsync:489, ActorSystemContext (org.thingsboard.server.actors)
persistDebugInput:447, ActorSystemContext (org.thingsboard.server.actors)
onRuleChainToRuleNodeMsg:132, RuleNodeActorMessageProcessor (org.thingsboard.server.actors.ruleChain)
onRuleChainToRuleNodeMsg:94, RuleNodeActor (org.thingsboard.server.actors.ruleChain)
doProcess:60, RuleNodeActor (org.thingsboard.server.actors.ruleChain)
process:45, ContextAwareActor (org.thingsboard.server.actors.service)
processMailbox:141, TbActorMailbox (org.thingsboard.server.actors)
run:-1, 1268144148 (org.thingsboard.server.actors.TbActorMailbox$$Lambda$1471)
exec:1426, ForkJoinTask$RunnableExecuteAction (java.util.concurrent)
doExec:290, ForkJoinTask (java.util.concurrent)
topLevelExec:1020, ForkJoinPool$WorkQueue (java.util.concurrent)
scan:1656, ForkJoinPool (java.util.concurrent)
runWorker:1594, ForkJoinPool (java.util.concurrent)
run:177, ForkJoinWorkerThread (java.util.concurrent)
InMemoryTbQueueConsumer poll方法一直被调用,比如DefaultTbQueueResponseTemplate、DefaultTbRuleEngineConsumerService、DefaultTbQueueRequestTemplate等 ,storage内容如下:
1
2
3
4
5
6
7"tb_core.4" -> {LinkedBlockingQueue@26304} size = 0
"tb_core.2" -> {LinkedBlockingQueue@26306} size = 0
"tb_usage_stats.5" -> {LinkedBlockingQueue@26308} size = 1
"tb_transport.api.responses.DESKTOP-EFHK05P" -> {LinkedBlockingQueue@26309} size = 0
"tb_rule_engine.main.2" -> {LinkedBlockingQueue@26311} size = 0
"tb_transport.notifications.DESKTOP-EFHK05P" -> {LinkedBlockingQueue@26313} size = 0
"tb_transport.api.requests" -> {LinkedBlockingQueue@26314} size = 0
其他说明:
DefaultTbQueueRequestTemplate/InMemoryTbQueueProducer/InMemoryTbQueueProducer/InMemoryStorage 队列相关
TbServiceInfoProvider/PartitionService/DiscoveryService/DefaultTbCoreConsumerService/DefaultTbRuleEngineConsumerService
DefaultTransportService/TbCoreTransportApiService
DefaultActorService/RuleChainManagerActor/TbActorMailbox/ComponentActor
TbActorMailbox邮箱中进行处理
TbActorMailbox processMailbox 消息流处理,关键调用链
RuleNodeActor doProcess 处理这
DefaultTelemetrySubscriptionService saveAndNotify
BaseEventService public ListenableFuture
@EventListener(ApplicationReadyEvent.class) 各种启动入口
App Actor/Tenant Actor/Device Actor/Rule Chain Actor/Rule Node Actor
Actor模型进行线程交互需要了解springboot-akka