整个连接过程如图所示:
高清图片链接
1、环境准备
- thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动)
- MQTTX 客户端(用来连接 thingsboard MQTT)
- 默认配置。queue.type=in-memory,cache.type=caffeine
因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。
使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。
2、源码分析
2.1 连接消息生产
2.1.1 入口
大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService
thingsboard 采用 netty 来实现一个MQTT server。
org.thingsboard.server.transport.mqtt.MqttTransportService
@PostConstruct
public void init() throws Exception {
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context, false))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
serverChannel = b.bind(host, port).sync().channel();
if (sslEnabled) {
b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context, true))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
}
log.info("Mqtt transport started!");
}
其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount
thingsboard 提取出两个参数变量
NETTY_BOSS_GROUP_THREADS
NETTY_WORKER_GROUP_THREADS
方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。
netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。
在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。
org.thingsboard.server.transport.mqtt.MqttTransportHandler
void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
ctx.close();
return;
}
deviceSessionCtx.setChannel(ctx);
if (CONNECT.equals(msg.fixedHeader().messageType())) {
processConnect(ctx, (MqttConnectMessage) msg);
} else if (deviceSessionCtx.isProvisionOnly()) {
processProvisionSessionMsg(ctx, msg);
} else {
enqueueRegularSessionMsg(ctx, msg);
}
}
在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect
在 MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理
org.thingsboard.server.transport.mqtt.MqttTransportHandler
void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());
String userName = msg.payload().userName();
String clientId = msg.payload().clientIdentifier();
deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));
if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
deviceSessionCtx.setProvisionOnly(true);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));
} else {
X509Certificate cert;
if (sslHandler != null && (cert = getX509Certificate()) != null) {
processX509CertConnect(ctx, cert, msg);
} else {
processAuthTokenConnect(ctx, msg);
}
}
}
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
String userName = connectMessage.payload().userName();
log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);
TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder()
.setClientId(connectMessage.payload().clientIdentifier());
if (userName != null) {
request.setUserName(userName);
}
byte[] passwordBytes = connectMessage.payload().passwordInBytes();
if (passwordBytes != null) {
String password = new String(passwordBytes, CharsetUtil.UTF_8);
request.setPassword(password);
}
transportService.process(DeviceTransportType.MQTT, request.build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
onValidateDeviceResponse(msg, ctx, connectMessage);
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close();
}
});
}
2.1.2 DefaultTransportService
一路跟下去
DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer
void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {
log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);
if (messagesStats != null) {
messagesStats.incrementTotal();
}
// 将消息发送给消息队列topic是tb_transport.api.requests
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (messagesStats != null) {
messagesStats.incrementSuccessful();
}
log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);
}
@Override
public void onFailure(Throwable t) {
if (messagesStats != null) {
messagesStats.incrementFailed();
}
pendingRequests.remove(requestId);
future.setException(t);
}
});
}
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢? 因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer2、怎么确定发送的主题是 tb_transport.api.requests ?
主题是通过 requestTemplate获取的
而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。
对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory
在InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate
的初始化,使用的是TbQueueTransportApiSettings的配置。
而 requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。
3、更进一步
认真分析初始化过程,得出下面请求主题的初始化图。
2.1.3 InMemoryTbQueueProducer
InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法
DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,
key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列
2.1.4 一个更抽象的发送模型
TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。
2.2 消费消息
2.2.1 InMemoryTbQueueConsumer
我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer
InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。
org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer
@Override
public List<T> poll(long durationInMillis) {
if (subscribed) {
@SuppressWarnings("unchecked")
List<T> messages = partitions
.stream()
.map(tpi -> {
try {
return storage.get(tpi.getFullTopicName());
} catch (InterruptedException e) {
if (!stopped) {
log.error("Queue was interrupted.", e);
}
return Collections.emptyList();
}
})
.flatMap(List::stream)
.map(msg -> (T) msg).collect(Collectors.toList());
if (messages.size() > 0) {
return messages;
}
try {
Thread.sleep(durationInMillis);
} catch (InterruptedException e) {
if (!stopped) {
log.error("Failed to sleep.", e);
}
}
}
return Collections.emptyList();
}
poll 方法的调用端,全局是搜不到的。
我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory。
InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。
2.2.2 InMemoryMonolithQueueFactory
我们先来看一下 InMemoryMonolithQueueFactory。InMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。
org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());
}
org.thingsboard.server.queue.settings.TbQueueTransportApiSettings
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {
// tb_transport.api.requests
@Value("${queue.transport_api.requests_topic}")
private String requestsTopic;
}
查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService
2.2.3 TbCoreTransportApiService
TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。
2.2.4 DefaultTbQueueResponseTemplate
至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。
继续往下,看看对于消息 requests,是怎么消费的。
2.2.5 DefaultTransportApiService
通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。
org.thingsboard.server.service.transport.DefaultTransportApiService
// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
if (credentials != null && credentials.getCredentialsType() == credentialsType) {
return getDeviceInfo(credentials);
} else {
return getEmptyTransportApiResponseFuture();
}
}