RT-Thread中使用Mqtt

环境:

开发板:Panduola(stm32L475)
KEIL5 开发环境
rtthread 4.0.3内核
使用ENV 配置Rtt


MQTT

1.MQTT介绍
客户端 Client

  • 使用MQTT的程序或设备。客户端总是通过网络连接到服务端。
  • 它可以发布应用消息给其它相关的客户端。
  • 订阅以请求接受相关的应用消息。
  • 取消订阅以移除接受应用消息的请求。
  • 从服务端断开连接。服务端 Server一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。

服务端

  • 接受来自客户端的网络连接。

  • 接受客户端发布的应用消息。

  • 处理客户端的订阅和取消订阅请求。

  • 转发应用消息给符合条件的已订阅客户端。

订阅 Subscription

  • 订阅包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅与单个会话(Session)关联。会话可以包含多于一个的订阅。会话的每个订阅都有一个不同的主题过滤器。

  • QoS0,At most once,至多一次;Sender 发送的一条消息,Receiver 最多能收到一次,如果发送失败,也就算了。

  • QoS1,At least once,至少一次;Sender 发送的一条消息,Receiver 至少能收到一次,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但Receiver 有可能会收到重复的消息

  • QoS2,Exactly once,确保只有一次。Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。
    2.MQTT协议数据包结构

一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、有效载荷(payload)三部分构成。

(1) 固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
(2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
(3)有效载荷(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

操作:

添加AT指令部分,使用ESP8266连接网络:

RT-Thread online packages  --->
  IoT - internet of things  --->
  [*] AT DEVICE: RT-Thread AT component porting or samples for different device
  [*]   Espressif ESP8266  --->

在这里插入图片描述

1. 添加RTT组件包:

MQTT
在这里插入图片描述
CJSON
在这里插入图片描述
AHT10:使用AHT10还需要打开I2c的驱动部分,使用旧版本1.0的AHT驱动,可以避免使用Sensor的框架
在这里插入图片描述
启用I2c总线:
在这里插入图片描述

2.使用scons --target=mdk5生成mdk工程

在这里插入图片描述

3.使用MQtt客户端:

修改连接参数:

#define MQTT_URI “tcp://192.168.1.110:1883”
#define MQTT_USERNAME “panduola”
#define MQTT_PASSWORD “panduola”
#define MQTT_SUBTOPIC “/test/topic2”
#define MQTT_PUBTOPIC “/test/topic1”

可在rtconfig.h中修改wifi连接参数:
在这里插入图片描述

  1. 创建一个客户端:
    static MQTTClient client;

  2. 初始化客户端:


/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{
    /* 初始 condata 参数 */
    MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
    static char cid[20] = {0};

    static int is_started = 0;
    if (is_started)
    {
        return;
    }
    /* 配置 MQTT 文本参数 */
    {
        client.isconnected = 0;
        client.uri = MQTT_URI;

        /* 生成随机客户端 ID */
        rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());

        // rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);
        /* 配置连接参数 */
        memcpy(sup_pub_topic, MQTT_SUBTOPIC, sizeof(MQTT_SUBTOPIC));
        memcpy(&client.condata, &condata, sizeof(condata));
        client.condata.clientID.cstring = cid;
        client.condata.keepAliveInterval = 60;
        client.condata.cleansession = 1;
        client.condata.username.cstring = MQTT_USERNAME;
        client.condata.password.cstring = MQTT_PASSWORD;

        /* 配置 mqtt 参数 */
        client.condata.willFlag = 0;
        client.condata.will.qos = 1;
        client.condata.will.retained = 0;
        client.condata.will.topicName.cstring = sup_pub_topic;

        client.buf_size = client.readbuf_size = 1024;
        client.buf = malloc(client.buf_size);
        client.readbuf = malloc(client.readbuf_size);
        if (!(client.buf && client.readbuf))
        {
            LOG_E("no memory for MQTT client buffer!");
            goto _exit;
        }

        /* 设置事件回调 */
        client.connect_callback = mqtt_connect_callback;
        client.online_callback = mqtt_online_callback;
        client.offline_callback = mqtt_offline_callback;
        /* 设置要订阅的 topic 和 topic 对应的回调函数 */
        client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;
        client.messageHandlers[0].callback = mqtt_sub_callback;
        client.messageHandlers[0].qos = QOS1;

        /* 设置默认订阅回调函数 */
        client.defaultMessageHandler = mqtt_sub_default_callback;
    }

    /* 启动 MQTT 客户端 */
    LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);
    paho_mqtt_start(&client);
    is_started = 1;

_exit:
    return;
}

  1. 设置收到信息后的回调:
static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{
    printf("Receive message ,Topic: %.*s,payload:\n", msg_data->topicName->lenstring.len,msg_data->topicName->lenstring.data);
    // 解析JSON消息
    cJSON *json_obj = cJSON_Parse((const char *)msg_data->message->payload);
    if (json_obj == NULL)
    {
        printf("Failed to parse JSON message\n");
        return;
    }
    cJSON *object = RT_NULL;
    object = cJSON_GetObjectItem(json_obj, "location");
    if (object != NULL)
    {

        printf("Location: %s\n", object->valuestring);
    }
    object = cJSON_GetObjectItem(json_obj, "led");
    if (object != NULL)
    {
        if(object->type == cJSON_True)
        printf("led: ture\n");
        else
        printf("led: false\n");
    }

    cJSON_Delete(json_obj);

    return;
}
  1. 发布消息:

/* MQTT 消息发布函数 */
static void mq_publish(const char *send_str)
{
    MQTTMessage message;
    const char *msg_str = send_str;
    const char *topic = MQTT_PUBTOPIC;
    message.qos = QOS1;
    message.retained = 0;
    message.payload = (void *)msg_str;
    message.payloadlen = strlen(message.payload);

    MQTTPublish(&client, topic, &message);

    return;
}

常用的cjson函数

void cJSON_Delete(cJSON *c)

删除 cJSON 指针,释放空间

char *cJSON_Print(cJSON *item)

cJSON数据解析成JSON字符串,并会在堆中开辟一块char *的内存空间,存放JSON字符串。
函数成功后会返回一个char *指针,该指针指向位于堆中JSON字符串。

cJSON *cJSON_Parse(const char *value)

将一个JSON数据包,按照cJSON结构体的结构序列化整个数据包,并在堆中开辟一块内存存储cJSON结构体
返回值:成功返回一个指向内存块中的cJSON的指针,失败返回NULL

cJSON *cJSON_GetObjectItem(cJSON *object,const char *string)

获取JSON字符串字段值,成功返回一个指向cJSON类型的结构体指针,失败返回NULL

常用的mqtt的API

/**
 * This function send an MQTT subscribe packet and wait for suback before returning.
 *
 * @param client the pointer of MQTT context structure
 * @param qos MQTT Qos type, only support QOS1
 * @param topic topic filter name
 * @param callback the pointer of subscribe topic receive data function
 *
 * @return the error code, 0 on start successfully.
 */
 /*订阅主题,最后一个形参是void (*subscribe_cb)(MQTTClient *client, MessageData *data);类型的函数指针*/
int paho_mqtt_subscribe(MQTTClient *client, enum QoS qos, const char *topic, subscribe_cb callback);
/**
 * This function publish message to specified mqtt topic.
 * @note it will be discarded, recommend to use "paho_mqtt_publish"
 *
 * @param c the pointer of MQTT context structure
 * @param topicFilter topic filter name
 * @param message the pointer of MQTTMessage structure
 *
 * @return the error code, 0 on subscribe successfully.
 */
int MQTTPublish(MQTTClient *client, const char *topic, MQTTMessage *message);

source:

#include <rtthread.h>
#include <rtdevice.h>
#include <board.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include "paho_mqtt.h"
// #include "wifi_config.h"

#include "aht10.h"
#define DBG_TAG "main"
#define DBG_LVL DBG_LOG
#include <rtdbg.h>

#include <cJSON.h>

/**
 * MQTT URI farmat:
 * domain mode
 * tcp://iot.eclipse.org:1883
 *
 * ipv4 mode
 * tcp://192.168.10.1:1883
 * ssl://192.168.10.1:1884
 *
 * ipv6 mode
 * tcp://[fe80::20c:29ff:fe9a:a07e]:1883
 * ssl://[fe80::20c:29ff:fe9a:a07e]:1884
 */
#define MQTT_URI "tcp://192.168.1.110:1883"
#define MQTT_USERNAME "panduola"
#define MQTT_PASSWORD "panduola"
#define MQTT_SUBTOPIC "/test/topic2"
#define MQTT_PUBTOPIC "/test/topic1"

#define LED_PIN GET_PIN(E, 8)

/* define MQTT client context */
static MQTTClient client;
static void mq_start(void);
static void mq_publish(const char *send_str);

char sup_pub_topic[48] = MQTT_PUBTOPIC;
char sup_sub_topic[48] = MQTT_SUBTOPIC;

static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{
    printf("Receive message ,Topic: %.*s,payload:\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);
    // 解析JSON消息
    cJSON *json_obj = cJSON_Parse((const char *)msg_data->message->payload);
    if (json_obj == NULL)
    {
        printf("Failed to parse JSON message\n");
        return;
    }
    cJSON *object = RT_NULL;
    object = cJSON_GetObjectItem(json_obj, "location");
    if (object != NULL)
    {

        printf("Location: %s\n", object->valuestring);
    }
    object = cJSON_GetObjectItem(json_obj, "led");
    if (object != NULL)
    {
        if (object->type == cJSON_True)
        {
            printf("led: ture\n");
            rt_pin_write(LED_PIN, PIN_LOW);
        }

        else
        {
            printf("led: false\n");
            rt_pin_write(LED_PIN, PIN_HIGH);
        }
    }

    cJSON_Delete(json_obj);

    return;
}

static void mqtt_sub_default_callback(MQTTClient *c, MessageData *msg_data)
{
    *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
    LOG_D("mqtt sub default callback: %.*s %.*s",
          msg_data->topicName->lenstring.len,
          msg_data->topicName->lenstring.data,
          msg_data->message->payloadlen,
          (char *)msg_data->message->payload);
    return;
}

static void mqtt_connect_callback(MQTTClient *c)
{
    LOG_I("Start to connect mqtt server");
}

static void mqtt_online_callback(MQTTClient *c)
{
    LOG_D("Connect mqtt server success");
    LOG_D("Publish message: Hello,RT-Thread! to topic: %s", sup_pub_topic);
    mq_publish("Hello,RT-Thread!");
}

static void mqtt_offline_callback(MQTTClient *c)
{
    LOG_I("Disconnect from mqtt server");
}

/* 创建与配置 mqtt 客户端 */
static void mq_start(void)
{
    /* 初始 condata 参数 */
    MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
    static char cid[20] = {0};

    static int is_started = 0;
    if (is_started)
    {
        return;
    }
    /* 配置 MQTT 文本参数 */
    {
        client.isconnected = 0;
        client.uri = MQTT_URI;

        /* 生成随机客户端 ID */
        rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());

        // rt_snprintf(sup_pub_topic, sizeof(sup_pub_topic), "%s%s", MQTT_PUBTOPIC, cid);
        /* 配置连接参数 */
        memcpy(sup_pub_topic, MQTT_SUBTOPIC, sizeof(MQTT_SUBTOPIC));
        memcpy(&client.condata, &condata, sizeof(condata));
        client.condata.clientID.cstring = cid;
        client.condata.keepAliveInterval = 60;
        client.condata.cleansession = 1;
        client.condata.username.cstring = MQTT_USERNAME;
        client.condata.password.cstring = MQTT_PASSWORD;

        /* 配置 mqtt 参数 */
        client.condata.willFlag = 0;
        client.condata.will.qos = 1;
        client.condata.will.retained = 0;
        client.condata.will.topicName.cstring = sup_pub_topic;

        client.buf_size = client.readbuf_size = 1024;
        client.buf = malloc(client.buf_size);
        client.readbuf = malloc(client.readbuf_size);
        if (!(client.buf && client.readbuf))
        {
            LOG_E("no memory for MQTT client buffer!");
            goto _exit;
        }

        /* 设置事件回调 */
        client.connect_callback = mqtt_connect_callback;
        client.online_callback = mqtt_online_callback;
        client.offline_callback = mqtt_offline_callback;
        /* 设置要订阅的 topic 和 topic 对应的回调函数 */
        client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;
        client.messageHandlers[0].callback = mqtt_sub_callback;
        client.messageHandlers[0].qos = QOS1;

        /* 设置默认订阅回调函数 */
        client.defaultMessageHandler = mqtt_sub_default_callback;
    }

    /* 启动 MQTT 客户端 */
    LOG_D("Start mqtt client and subscribe topic:%s", sup_pub_topic);
    paho_mqtt_start(&client);
    is_started = 1;

_exit:
    return;
}

/* MQTT 消息发布函数 */
static void mq_publish(const char *send_str)
{
    MQTTMessage message;
    const char *msg_str = send_str;
    const char *topic = MQTT_PUBTOPIC;
    message.qos = QOS1;
    message.retained = 0;
    message.payload = (void *)msg_str;
    message.payloadlen = strlen(message.payload);

    MQTTPublish(&client, topic, &message);

    return;
}

rt_thread_t TH_Get_HU;
rt_thread_t Publish_value;
float humidity, temperature;

void get_humi_temp(void *parameter)
{

    aht10_device_t dev;

    const char *i2c_bus_name = "i2c2";
    int count = 0;

    rt_thread_mdelay(2000);

    dev = aht10_init(i2c_bus_name);
    if (dev == RT_NULL)
    {
        LOG_E(" The sensor initializes failure");
    }

    while (count++ < 100)
    {

        humidity = aht10_read_humidity(dev);
        // LOG_D("humidity   : %d.%d %%", (int)humidity, (int)(humidity * 10) % 10);

        temperature = aht10_read_temperature(dev);
        // LOG_D("temperature: %d.%d", (int)temperature, (int)(temperature * 10) % 10);

        rt_thread_mdelay(1000);
    }
}

void Publish_Date(void *parameter)
{
    char send_str[128];
    while (1)
    {
        // sprintf(send_str, "{\"temperature\":%d.%d,\"humidity\":%d.%d}", (int)temperature, (int)(temperature * 10) % 10, (int)humidity, (int)(humidity * 10) % 10);

        sprintf(send_str, "{\"location\":\"10#A401\",\"led\":true,\"environment\":{\"temperature\":%d.%d,\"humidity\":%d.%d}}", (int)temperature, (int)(temperature * 10) % 10, (int)humidity, (int)(humidity * 10) % 10);
        mq_publish(send_str);
        rt_thread_mdelay(1000);
    }
}

int main(void)
{
    mq_start();
    TH_Get_HU = rt_thread_create("get_humi_temp", get_humi_temp, RT_NULL, 1024, 20, 10);
    Publish_value = rt_thread_create("Publish_value", Publish_Date, RT_NULL, 1024, 20, 10);
    rt_pin_mode(LED_PIN, PIN_MODE_OUTPUT);
    rt_thread_startup(TH_Get_HU);
    rt_thread_mdelay(1000);
    rt_thread_startup(Publish_value);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/629098.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python代码:三、读入字符串

1、题目 从变量输出开始。请使用input函数读入一个字符串&#xff0c;然后将其输出。 2、代码 import sys ainput() print(a) 3、在sublime运行的结果

客观需求验证的常见5大步骤(实施版)

我们在挖掘用户需求时&#xff0c;往往容易犯伪需求或需求错位等问题&#xff0c;因此需要进行客观需求验证。通过客观的验证&#xff0c;我们可以有效减少主观判断误差问题&#xff0c;确保需求的准确性&#xff0c;从而降低需求变更和项目风险的概率&#xff0c;减少开发成本…

intel三年来首次大更新竟然倒吸牙膏,线程数砍掉25%!

每年科技圈最热闹的几个话题&#xff0c;无非是几大科技公司发布新的产品&#xff0c;那这其中必然有核心巨头 intel 的身影。 据外媒 Benchlife 披露&#xff0c;英特尔计划在其 Arrow Lake-S 架构 Core Ultra 200 台式机 CPU 系列中推出共计 21 款 CPU。 这是 intel 首次在桌…

白酒:酒精度数对白酒风味的影响与品鉴技巧

云仓酒庄豪迈白酒作为品质的白酒品牌&#xff0c;其酒精度数对白酒风味的影响与品鉴技巧是品鉴爱好者关注的重点。酒精度数作为衡量白酒质量的一项重要指标&#xff0c;不仅决定了白酒的口感和风格&#xff0c;更在一定程度上体现了白酒的品质和价值。本文将探讨酒精度数对云仓…

Jmeter中线程组介绍

1.线程数的意义 Jmeter采用了线程来模拟用户&#xff0c;即1个线程代表1个用户&#xff0c;线程可以简单理解为计算机处理任务时的一个具体执行人。 一个任务可以由多个人&#xff08;线程&#xff09;共同完成&#xff0c;也可以由一个人&#xff08;线程&#xff09;来完成&a…

WebLogic SSL应用

SSL 安全套接字层(SSL)是通过在客户端和Web服务器端之间进行身份验证,并对双方交换的数据进行加密,从而提供安全连接。 验证类型: 单向:客户端验证Web服务器端证书 双向:客户端验证Web服务器证书, Web服务器验证客户端证书 Weblogic Server12c 支持 SSL 3.0 和 TLS1.0 …

智能防疫电梯模拟控制系统设计-设计说明书

设计摘要&#xff1a; 本设计是基于单片机的智能防疫电梯模拟控制系统&#xff0c;主要实现了多项功能。首先&#xff0c;系统进行无接触测温&#xff0c;如果温度正常则可以启动电梯运行&#xff0c;如果温度异常则电梯会报警提示有乘客体温异常&#xff0c;电梯不会运行。其…

Java | Leetcode Java题解之第88题合并两个有序数组

题目&#xff1a; 题解&#xff1a; class Solution {public void merge(int[] nums1, int m, int[] nums2, int n) {int p1 m - 1, p2 n - 1;int tail m n - 1;int cur;while (p1 > 0 || p2 > 0) {if (p1 -1) {cur nums2[p2--];} else if (p2 -1) {cur nums1[p…

使用单目相机前后帧特征点匹配进行3D深度估计的方法

在计算机视觉和机器人领域&#xff0c;三维空间感知是实现环境理解和交互的核心技术之一。特别是在资源受限的场合&#xff0c;使用针孔模型的单目相机进行深度估计成为了一种既经济又实用的解决方案。单目深度估计技术依赖于从连续视频帧中提取和匹配特征点&#xff0c;以估计…

dbeaver 链接 Oceanbase 数据库,dbeaver安装数据库驱动

新增驱动 提前到Oceanbase官网下载好驱动 1、点击数据库 -> 驱动管理器 -> 新建 2、添加驱动文件 联接数据库 1、选择你添加的驱动 2、测试

CST电磁仿真软件远场变更和结果相关【从入门到精通】

1、使用阵列系数计算阵列远场结果 对单一天线进行 仿真分析后&#xff0c;查看反映阵列系数的远场结果&#xff01; Navigation Tree > Farfields > Selection > Farfield Plot > Array Factor 下面介绍一下&#xff0c;对单一天线进行仿真后&#xff0c;轻松计…

小白必看:新手学编程必会的100个代码

前言 我记得刚开始接触编程的时候&#xff0c;觉得太难了。 也很好奇&#xff0c;写代码的那些人也太厉害了吧&#xff1f;全是英文的&#xff0c;他们的英文水平一定很好吧&#xff1f; 他们是怎么记住这么多代码格式的&#xff1f;而且错了一个标点符号&#xff0c;整个程…

线性模型之岭回归的用法

实战&#xff1a;使用岭回归模型 完整代码&#xff1a; import numpy as np import matplotlib.pyplot as plt from sklearn.linear_model import LinearRegression from sklearn.datasets import make_regression from sklearn.model_selection import train_test_split fro…

C语言收尾 预处理相关知识

一. 预处理详解 1.1 预定义符号 FILE //进行编译的源文件LINE //文件当前的行号DATE //文件被编译的日期TIME //文件被编译的时间FUNCTION //文件当前所在的函数STDC //如果编译器遵循ANSI C标准&#xff0c;其值为1&#xff0c;否则未定义 这些预定义符号都是语言内置的 我们…

【教学类-55-04】20240515图层顺序挑战(四格长条纸加黑色边框、4*4、7张,不重复5400张,16坐标点颜色哈希值去重、保留7色)

背景需求&#xff1a; 前文实现了7张色彩纸条加上黑色边框的需求。 【教学类-55-02】20240512图层顺序挑战&#xff08;四格长条纸加黑色边框、4*4、7张 、43200张去掉非7色有23040张&#xff0c;哈希算法快速去重剩余1221张&#xff09;-CSDN博客文章浏览阅读1k次&#xff0…

反序列化漏洞【1】

1.不安全的反序列化漏洞介绍 序列化&#xff1a;将对象转换成字符串&#xff0c;目的是方便传输&#xff0c;关键词serialize a代表数组&#xff0c;数组里有三个元素&#xff0c;第一个元素下标为0&#xff0c;长度为7&#xff0c;内容为porsche&#xff1b;第二个元素下标为1…

工作达人的小秘密

在快节奏的工作环境中&#xff0c;想要提升效率&#xff0c;保持头脑清晰&#xff1f;别急&#xff0c;我这就为你揭秘我的几大法宝&#xff0c;让我们一起探索它们如何助你事半功倍&#xff01; 1️⃣【亿可达】 它是一款自动化工具&#xff0c;相当于国内版免费的zaiper。它…

新手也能看懂的前端单元测试框架:Vitest

单元测试的概念及作用 1.什么是单元测试&#xff1f; 单元测试是测试中的一个重要环节&#xff0c;它针对软件中的最小可测试单元进行验证&#xff0c;通常是指对代码中的单个函数、方法或模块进行测试。 单元测试旨在确定特定部分代码的行为是否符合预期&#xff0c;通过针…

LearnOpenGL(十八)之面剔除

一、面剔除 对于一个3D立方体&#xff0c;无论我们从哪个方向&#xff0c;最多只能同时看到3个面。如果我们能够以某种方式丢弃另外几个看不见的面&#xff0c;我们就能省下超过50%的片段着色器执行数&#xff01; 这正是面剔除(Face Culling)所做的。OpenGL能够检查所有面向…

在Linux系统上使用nmcli命令配置各种网络(有线、无线、vlan、vxlan、路由、网桥等)

前言&#xff1a;原文在我的博客网站中&#xff0c;持续更新数通、系统方面的知识&#xff0c;欢迎来访&#xff01; 在Linux系统上使用nmcli命令配置各种网络&#xff08;有线、无线、vlan、vxlan等&#xff09;https://myweb.myskillstree.cn/123.html 更新于2024/5/13&…