目录
MQTT源码分析
1. MQTT客户端功能
2. 客户端软件如何实现
3. 程序分层
4. 情景分析
4.1 连接服务器
4.2 创建线程
4.3 发布消息
4.4 最复杂:订阅消息
MQTT源码分析
分析源码:mqttclient\test\emqx\test.c
参考资料:
-
kawaii-mqtt源码:
-
作者发布源码:GitHub - jiejieTop/mqttclient: A high-performance, high-stability, cross-platform MQTT client, developed based on the socket API, can be used on embedded devices (FreeRTOS / LiteOS / RT-Thread / TencentOS tiny), Linux, Windows, Mac, with a very concise The API interface realizes the quality of service of QOS2 with very few resources, and seamlessly connects the mbedtls encryption library.
-
大牛维护的:GitHub - longtengmcu/kawaii-mqtt: 基于socket API的MQTT客户端,以极少的资源实现qos2服务质量,并且实现mbedtls支持,此仓库是专门为RT-Thread做的软件包,原始仓库位于:https://github.com/jiejieTop/mqttclient
-
-
博客
-
作者博客:
-
你不得不看的图文并茂的MQTT协议通信过程!!!
-
MQTT协议简介及协议原理
-
mqttclient设计与实现方式
-
-
大牛笔记:
-
记一次解决MQTT软件包内存泄露的心路历程
-
-
-
APP
-
Download | Eclipse Mosquitto
-
MQTTX:全功能 MQTT 客户端工具
-
1. MQTT客户端功能
MQTT通信模型示意图如下:
以"记者-电视台-观众"的模式来理解,客户端具体的流程是这样的:
-
客户端1:观众打电话到电视台:connect
-
客户端1:观众向电视台订阅"财经新闻": Subscribe 某个 Topic
-
客户端2:记者打电话到电视台:connect
-
客户端2:记者向电视台发布"财经新闻":Public某个Topic的某个Playload
-
服务器:电视台向"订阅了财经新闻的观众"发布"某条消息":Public某个Playload给Subscriber
整个过程中,电视台和记者、电视台和观众直接的电话要保存连接状态,还要时不时确认一下:
-
记者要时不时给电视台喊一声"喂":确保电视台还正常
-
观众要时不时给电视台喊一声"喂":确保电视台还正常
2. 客户端软件如何实现
-
连接服务器
-
订阅:
-
发布订阅请求,等待回应
-
循环:读取Publish信息(得到订阅的信息),处理
-
-
发布:
-
发送数据包即可
-
-
PING
-
循环:确保自己、对方还活着
-
mqtt_packet_handle > mqtt_keep_alive
-
需要一个循环!
3. 程序分层
至少可以分为3层:
-
最上层:APP
-
中间层:MQTT
-
平台层:实现多线程、定时器、网卡收发数据
4. 情景分析
4.1 连接服务器
函数调用过程:
main
client = mqtt_lease();
mqtt_set_port(client, "1883");
mqtt_set_host(client, "www.jiejie01.top");
mqtt_connect(client);
mqtt_connect_with_results(c);
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
rc = network_connect(c->mqtt_network);
nettype_tcp_connect(n);
platform_net_socket_connect
4.2 创建线程
调用过程:
main
mqtt_connect(client);
mqtt_connect_with_results(c);
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
rc = network_connect(c->mqtt_network);
/* send connect packet */
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
goto exit;
if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
}
/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread,c, ...);
4.3 发布消息
调用过程:
main
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
mqtt_publish_thread
mqtt_publish(client, "topic1", &msg);
// 1. 构造消息
mqtt_message_t msg;
memset(&msg, 0, sizeof(msg));
msg.payload = (void *) buf;
msg.payloadlen = xxx;
mqtt_publish(client, "topic1", &msg);
// 1.1 根据MQTT协议构造数据包
// 1.2 根据平台相关的函数发送数据包
mqtt_send_packet
network_write
nettype_tcp_write
platform_net_socket_write_timeout
4.4 最复杂:订阅消息
消息何时到来?不知道!
所以,必定是某个内核线程不断查询网卡:
-
读网卡数据
-
得到数据的话就判断、处理
-