ThingsBoard MQTT 连接认证过程 源码分析+图例

整个连接过程如图所示:

 高清图片链接

1、环境准备

  • thingsboard3.5.1 源码启动。(不懂怎么启动的,大家可以看我的博文ThingsBoard3.5.1源码启动)
  • MQTTX 客户端(用来连接 thingsboard MQTT)
  • 默认配置。queue.type=in-memory,cache.type=caffeine

因为我们的目的,是快速了解 thingsboard 的启动过程,所以所有的配置全部采用默认的方式。默认消息队列采用内存队列ConcurrentHashMap,缓存也采用内存缓存caffeine。

使用 customerA 用户账号密码登录,使用设备A1 AccessToken 连接。

2、源码分析

2.1 连接消息生产

2.1.1 入口

大家知道MQTT是基于TCP协议之上的轻量级通信协议,而TCP协议是面向连接、请求响应的通信协议。所以在 thingsboard 这一侧必然有一个服务器实现,用来等待客户端的连接。这个实现就是MqttTransportService

thingsboard 采用 netty 来实现一个MQTT server。

org.thingsboard.server.transport.mqtt.MqttTransportService

@PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context, false))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        if (sslEnabled) {
            b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new MqttTransportServerInitializer(context, true))
                    .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
            sslServerChannel = b.bind(sslHost, sslPort).sync().channel();
        }
        log.info("Mqtt transport started!");
    }

其中,关系到 netty server 性能的 bossGroupThreadCount,workerGroupThreadCount

thingsboard 提取出两个参数变量 

NETTY_BOSS_GROUP_THREADS

NETTY_WORKER_GROUP_THREADS

方便用户根据自己的设备台数、部署架构,来优化自己的 netty 性能。

 netty server 的请求处理过程如下图所示,圆圈为具体实现类,方框为方法。

在 MqttTransportHandler#processMqttMsg 方法中,因为我们的消息类型是连接,所以我们会进入 processConnect 方法。

org.thingsboard.server.transport.mqtt.MqttTransportHandler

void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            ctx.close();
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        if (CONNECT.equals(msg.fixedHeader().messageType())) {
            processConnect(ctx, (MqttConnectMessage) msg);
        } else if (deviceSessionCtx.isProvisionOnly()) {
            processProvisionSessionMsg(ctx, msg);
        } else {
            enqueueRegularSessionMsg(ctx, msg);
        }
    }

 在 MqttTransportHandler#processConnect 方法中,由于采用 AccessToken 的授权方式,所以会进入 processAuthTokenConnect

MqttTransportHandler#processAuthTokenConnect 方法中,获取我们在MQTTX填的用户名、密码,然后委托给 DefaultTransportService#process 处理

org.thingsboard.server.transport.mqtt.MqttTransportHandler

void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
        log.debug("[{}][{}] Processing connect msg for client: {}!", address, sessionId, msg.payload().clientIdentifier());
        String userName = msg.payload().userName();
        String clientId = msg.payload().clientIdentifier();
        deviceSessionCtx.setMqttVersion(getMqttVersion(msg.variableHeader().version()));
        if (DataConstants.PROVISION.equals(userName) || DataConstants.PROVISION.equals(clientId)) {
            deviceSessionCtx.setProvisionOnly(true);
            ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SUCCESS, msg));
        } else {
            X509Certificate cert;
            if (sslHandler != null && (cert = getX509Certificate()) != null) {
                processX509CertConnect(ctx, cert, msg);
            } else {
                processAuthTokenConnect(ctx, msg);
            }
        }
    }

private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
        String userName = connectMessage.payload().userName();
        log.debug("[{}][{}] Processing connect msg for client with user name: {}!", address, sessionId, userName);
        TransportProtos.ValidateBasicMqttCredRequestMsg.Builder request = TransportProtos.ValidateBasicMqttCredRequestMsg.newBuilder()
                .setClientId(connectMessage.payload().clientIdentifier());
        if (userName != null) {
            request.setUserName(userName);
        }
        byte[] passwordBytes = connectMessage.payload().passwordInBytes();
        if (passwordBytes != null) {
            String password = new String(passwordBytes, CharsetUtil.UTF_8);
            request.setPassword(password);
        }
        transportService.process(DeviceTransportType.MQTT, request.build(),
                new TransportServiceCallback<>() {
                    @Override
                    public void onSuccess(ValidateDeviceCredentialsResponse msg) {
                        onValidateDeviceResponse(msg, ctx, connectMessage);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.trace("[{}] Failed to process credentials: {}", address, userName, e);
                        ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
                        ctx.close();
                    }
                });
    }

2.1.2 DefaultTransportService

 一路跟下去

DefaultTbQueueRequestTemplate#sendToRequestTemplate 方法会调用 TbQueueProducer接口 send 方法,往主题 tb_transport.api.requests 发送消息。TbQueueProducer实现类是InMemoryTbQueueProducer

void sendToRequestTemplate(Request request, UUID requestId, SettableFuture<Response> future, ResponseMetaData<Response> responseMetaData) {
        log.trace("[{}] Sending request, key [{}], expTime [{}], request {}", requestId, request.getKey(), responseMetaData.expTime, request);
        if (messagesStats != null) {
            messagesStats.incrementTotal();
        }
        // 将消息发送给消息队列topic是tb_transport.api.requests
        requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
            @Override
            public void onSuccess(TbQueueMsgMetadata metadata) {
                if (messagesStats != null) {
                    messagesStats.incrementSuccessful();
                }
                log.trace("[{}] Request sent: {}, request {}", requestId, metadata, request);
            }

            @Override
            public void onFailure(Throwable t) {
                if (messagesStats != null) {
                    messagesStats.incrementFailed();
                }
                pendingRequests.remove(requestId);
                future.setException(t);
            }
        });
    }
1、TbQueueProducer 接口的实现类有很多个,具体发送消息的实现类是哪一个呢?
因为我们使用内存队列方式启动,所以实现类是 InMemoryTbQueueProducer

2、怎么确定发送的主题是 tb_transport.api.requests

主题是通过 requestTemplate获取的

而 requestTemplate又是 DefaultTbQueueRequestTemplate的一个属性,通过 Builder 构建器注入进来的。

对于DefaultTbQueueRequestTemplate的初始化,thingsboard 提供了很多基于不同种消息队列的实现方式。我们现在所用的是内存队列,所以进入InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory中,对于DefaultTbQueueRequestTemplate.requestTemplate

的初始化,使用的是TbQueueTransportApiSettings的配置。

requestsTopic 读取的,就是 tb_transport.api.requests 这一主题。

3、更进一步

认真分析初始化过程,得出下面请求主题的初始化图。

2.1.3 InMemoryTbQueueProducer

InMemoryTbQueueProducer#send 调用 DefaultInMemoryStorage#put 方法 

DefaultInMemoryStorage 往自己持有的 ConcurrentHashMap 中存放消息,

key 是主题 tb_transport.api.requests,value 是存放有消息的 LinkedBlockingQueue 内存队列

2.1.4 一个更抽象的发送模型

TbQueueProducer 往队列 queue 发送消息,主题 tb_transport.api.requests,而不管这个消息的实现是内存队列、kafka、RabbitMQ、ServiceBus 等。TbQueueConsumer 从queue中消费消息。至此,生产连接请求消息的过程结束。

2.2 消费消息

2.2.1 InMemoryTbQueueConsumer

我们知道现在消息生产者接口 TbQueueProducer 的实现类是 InMemoryTbQueueProducer,则它必然有一个消息消费者实现接口 TbQueueConsumer,消费者实现类是 InMemoryTbQueueConsumer

 InMemoryTbQueueConsumer 中对于消息的消费只有把消息从 ConcurrentHashMap 拉取出来的逻辑,而没有具体处理的逻辑,则处理的逻辑,是存在于调用这个 poll 方法的地方。

org.thingsboard.server.queue.memory.InMemoryTbQueueConsumer

@Override
    public List<T> poll(long durationInMillis) {
        if (subscribed) {
            @SuppressWarnings("unchecked")
            List<T> messages = partitions
                    .stream()
                    .map(tpi -> {
                        try {
                            return storage.get(tpi.getFullTopicName());
                        } catch (InterruptedException e) {
                            if (!stopped) {
                                log.error("Queue was interrupted.", e);
                            }
                            return Collections.emptyList();
                        }
                    })
                    .flatMap(List::stream)
                    .map(msg -> (T) msg).collect(Collectors.toList());
            if (messages.size() > 0) {
                return messages;
            }
            try {
                Thread.sleep(durationInMillis);
            } catch (InterruptedException e) {
                if (!stopped) {
                    log.error("Failed to sleep.", e);
                }
            }
        }
        return Collections.emptyList();
    }

poll 方法的调用端,全局是搜不到的。

 我们可以探究一下它的构造方法,看看谁初始化了它,则谁就有可能调用它的 poll 方法。排除掉它自己,有两个类初始化了 InMemoryTbQueueConsumer,分别是 InMemoryMonolithQueueFactory 和 InMemoryTbTransportQueueFactory

InMemoryTbTransportQueueFactory 订阅的主题,是 tb_transport.api.responses 不是我们要找的 tb_transport.api.requests,忽略。

2.2.2 InMemoryMonolithQueueFactory

我们先来看一下 InMemoryMonolithQueueFactoryInMemoryMonolithQueueFactory 里面有一个方法,传入的 TbQueueTransportApiSettings,刚好就是我们请求消息的主题配置类。

org.thingsboard.server.queue.provider.InMemoryMonolithQueueFactory
@Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
        return new InMemoryTbQueueConsumer<>(storage, transportApiSettings.getRequestsTopic());
    }


org.thingsboard.server.queue.settings.TbQueueTransportApiSettings 
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {
    // tb_transport.api.requests
    @Value("${queue.transport_api.requests_topic}")
    private String requestsTopic;
}

查看对方法 createTransportApiRequestConsumer 的调用,找到一个非具体队列实现的调用类TbCoreTransportApiService

2.2.3 TbCoreTransportApiService

TbCoreTransportApiService 初始化 init 方法,会创建 TbQueueConsumer——也就是具体的实现类 InMemoryTbQueueConsumer 注入到 DefaultTbQueueResponseTemplate.requestTemplate,然后执行 DefaultTbQueueResponseTemplate#init() 方法。

 2.2.4 DefaultTbQueueResponseTemplate

至此,我们找到了 InMemoryTbQueueConsumer#poll 调用的地方。

继续往下,看看对于消息 requests,是怎么消费的。 

 2.2.5 DefaultTransportApiService

 通过 AccessToken 查找到设备的授权 DeviceCredentials (即device_credentials表记录)然后构造 DeviceInfo 返回给设备端。

org.thingsboard.server.service.transport.DefaultTransportApiService

// credentialsId 就是 AccessToken
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
        DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
        if (credentials != null && credentials.getCredentialsType() == credentialsType) {
            return getDeviceInfo(credentials);
        } else {
            return getEmptyTransportApiResponseFuture();
        }
    }

 

2.2.6 消费消息流程图

3、总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/673157.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

接口以及会话控制

接口 接口是前后端交互通信的桥梁 简单理解&#xff1a;一个接口就是服务中的一个路由规划&#xff0c;根据请求响应结果 接口的英文单词是API&#xff08;Application Program Interface),所以有时也称之为API接口 这里的接口指的是数据接口&#xff0c;与编程语言&#x…

python基础知识点总结(第二节判断与循环)

一、判断语句 1、if判断语句 ~if语句的基本格式 if 要判断的条件&#xff1a; 条件成立时&#xff0c;要做的事情 ~if语句的注意事项&#xff1a; 判断语句的结果一定要是布尔类型不要忘记判断条件后的&#xff1a;冒号归属于if语句的代码块&#xff0c;需要在前方填…

【软件测试】6.设计测试用例的设计方法

目录 1.基于需求的设计方法 2.具体的设计方法 2.1等价类 2.2边界值 2.3正交法 2.4判定表法 2.5场景法 2.6 错误猜测法 1.基于需求的设计方法 基于需求的设计方法也是总的设计测试用例的方法&#xff0c;在工作中&#xff0c;我们需要参考需求文档/产品规格说明书来设计…

告别鼠标,安卓模拟鼠标,绘图板,手写板操作电脑PC端,卡卡罗特也说好,儿童节快乐

家人们&#xff0c;上链接了&#xff1a;https://download.csdn.net/download/jasonhongcn/89387887 横屏模式&#xff1a; 竖屏模式&#xff1a; 操作说明&#xff1a; 1. 手势滑动模拟鼠标移动 2. 界面如果有滚动条&#xff0c;右手指按紧&#xff0c;通过左手指移动实现…

【智能算法】红嘴蓝喜鹊优化算法(RBMO)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献5.代码获取 1.背景 2024年&#xff0c;S Fu受到自然界中红嘴蓝喜鹊社会行为启发&#xff0c;提出了红嘴蓝喜鹊优化算法&#xff08;Red-billed Blue Magpie Optimizer, RBMO&#xff09;。 2.算法原理 2.1算…

【原创】springboot+mysql员工管理系统

个人主页&#xff1a;程序猿小小杨 个人简介&#xff1a;从事开发多年&#xff0c;Java、Php、Python、前端开发均有涉猎 博客内容&#xff1a;Java项目实战、项目演示、技术分享 文末有作者名片&#xff0c;希望和大家一起共同进步&#xff0c;你只管努力&#xff0c;剩下的交…

找出字符串中出现最多次数的字符以及出现的次数

str.charAt(i) 是JavaScript中获取字符串中特定位置字符的方法&#xff0c;表示获取当前的字符。 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-wi…

Linux基础之进程控制--进程的创建和退出

目录 一、进程的创建 二、进程的终止 2.1 进程退出的场景 2.2 main()函数的返回值 2.3 用exit和_exit进行退出 2.4 进程异常终止 一、进程的创建 关于一个进程是如何被创建出来的&#xff0c;我们已经有所介绍这里我在给大家补充些东西。 首先&#xff0c;我们知道…

python之tkinter学习笔记

目录 Widget概览 加强版的tkinter模块 一、通用的 1.1、Label PhotoImage (gif图片加载) jpg图片加载 config方法-计时器 1.2、PanedWindow 1.3、LabelFrame 二、tkinter专有 2.1、Menu菜单复选框 2.2、工具栏 三、tkinter.ttk专有 3.1、Separator分隔线 3.2、n…

【C++】C++ 宾馆酒店管理系统(源码+数据+论文)【独一无二】(史上功能最全,拿来即用)

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

LeetCode-47. 全排列 II【】

TOC 题目描述&#xff1a; 给定一个可包含重复数字的序列 nums &#xff0c;按任意顺序 返回所有不重复的全排列。 示例 1&#xff1a; 输入&#xff1a;nums [1,1,2] 输出&#xff1a; [[1,1,2], [1,2,1], [2,1,1]] 示例 2&#xff1a; 输入&#xff1a;nums [1,2,3] 输…

InnoDB Data Locking - Part 2.5 “Locks“ (Deeper dive)

All together now 现在让我们把在 InnoDB Data Locking – Part 2 “Locks” 中学习到的有关表锁和记录锁的知识结合起来&#xff0c;以理解以下情况&#xff1a; 我们看到&#xff1a; 第一个 SELECT * FROM t FOR SHARE; 在 5、10、42 和 supremum 伪记录上创建了 S 锁。…

【软件设计师】2022年上半年真题解析

​​冯诺依曼计算机体系结构的基本特点是&#xff1a; A. 程序指令和数据都采用二进制表示 - 这是正确的&#xff0c;因为冯诺依曼架构下的计算机使用二进制形式来表示和处理所有信息&#xff0c;包括指令和数据。 B. 程序指令总是存储在主存中&#xff0c;而数据则存储在高速…

Discuz_X3.5各行业资料文档下载模板

下载地址&#xff1a;Discuz_X3.5各行业资料文档下载模板

验证外星语词典

在解决算法题时&#xff0c;哈希表是经常被使用的工具&#xff0c;可以用来记录字符串中字母出现的次数&#xff0c;字符串中字符出现的位置等&#xff0c;这里用到的就是利用哈希表储存字符串中字符出现的的位置。 “外星语”的字母表顺序是不一样的&#xff0c;所以…

python数据预处理

PYTHON 最流行库&#xff1a;Numpy、Matplotlib 和 Pandas。Numpy 是满足所有数学运算所需要的库&#xff0c;由于代码是基于数学公式运行的&#xff0c;因此就会使用到它。Maplotlib&#xff08;具体而言&#xff0c;Matplotlib.pyplot&#xff09;则是满足绘图所需要的库。Pa…

创建一个支持切换阅读模式和答题模式的Anki问答题模板

为了备考某个需要默写的科目&#xff0c;做了个问答题笔记模板&#xff0c;如下&#xff1a; 在上图的回答栏填写答案后&#xff0c;点击显示答案按钮转到背面&#xff1a; 只实现上面的功能是很简单的&#xff0c;直接基于Anki自带的问答题模板添加自己需要的字段即可。问题…

嵌入式工程师人生提质的十大成长型思维分享

大家好,作为一名嵌入式开发者,很多时候,需要考虑个人未来的发展,人生旅途复杂多变,时常面临各种各样的挑战。如何在这个复杂多变的社会中稳步向前,不断成长,成为每个人都应该思考的问题。实际上,思维方式的差异决定我们应对挑战的能力与成长的速度。 第一:寻找自我坐…

Linux CPU占用高问题

1、top命令和历史系统监控找出持续占用CPU的top5进程。 # top -c 依次按键盘键b->x&#xff0c;按照CPU由高到底排序。shiftP和shiftM切换使用CPU和内存的排序 # top -Hp pid 查看进程中所有线程的占用情况 # top -Hp 1124 -n 1 -b | awk {print $NF} | sort | uniq -c |…

Python | 自动探索性数据分析(EDA)库SweetViz

SweetViz是一个开放源代码Python库&#xff0c;主要用于生成精美的高密度可视化文件&#xff0c;启动探索性数据分析&#xff08;EDA&#xff09;&#xff0c;输出为完全独立的HTML应用程序。 探索性数据分析&#xff08;EDA&#xff09;是分析和总结数据集主要特征的过程&…