今天非常荣幸向各位小伙伴详细展示一个由共创社成员完成的MQTT远程温湿度监控系统项目。该项目借助ELF 1开发板作为核心技术支撑,成功实现了对各类环境空间中温湿度数据的实时、远程、稳定监测。该系统不仅集成了先进的数据采集模块,用于精确感知现场环境变化,同时利用MQTT协议的轻量级特性,确保了数据在复杂网络环境下的可靠传输。在此接下来,就为各位小伙伴详尽展示这一项目的相关细节
1、Linux开发板开发环境搭建
(1)开发板动态分配ip地址
(开发板与家用路由器连接,路由器支持DHCP自动IP地址分配)
root@ELF1:~# udhcpc -i eth0
(2)将nfs服务器挂载到开发板的/mnt目录
(其中“192.168.1.10”是Ubuntu的ens36的ip)
root@ELF1:~# mount -t nfs -o nolock,vers=3 192.168.1.10:/home/book/nfs_rootfs /mnt
可以看到开发板的/mnt目录已经有了文件 。
2、Ubuntu编译环境搭建:
(1)将paho mqtt的官方库克隆到Ubuntu的“~/nfs_rootfs”路径
book@100ask:~/nfs_rootfs$ git clone https://github.com/eclipse/paho.mqtt.c.git
(2)修改“~/nfs_rootfs/paho_mqtt/paho.mqtt.c”路径下的Makefile文件
修改prefix所代表的工具链路径:
修改编译器:
(3)编译后得到链接库
book@100ask:~/nfs_rootfs/paho_mqtt/paho.mqtt.c$ make
arm-gcc编译生成的.so库文件,保存在paho.mqtt.c/build/output里面。
将.so库文件安装到本地PC
book@100ask:~/nfs_rootfs/paho_mqtt/paho.mqtt.c$ sudo make install
(4)将.so库文件安装到开发板的“/lib”路径,开发板才能运行paho mqtt编译后的可执行文件
root@ELF1:~# install /mnt/paho_mqtt/paho.mqtt.c/build/output/libpaho-mqtt3* /lib
以上开发环境搭建完成!
3、工程文件的建立
(1)将“~/nfs_rootfs/paho_mqtt/paho.mqtt.c”
路径中的src文件夹拷贝到“~/nfs_rootfs/mqtt_iot”路径下
book@100ask:~$ cp -r ~/nfs_rootfs/paho_mqtt/paho.mqtt.c ~/nfs_rootfs/mqtt_iot
(2)在“~/nfs_rootfs/mqtt_iot”路径下添加文件:
4、阿里云服务器设置:
(1)服务器的注册及产品、设备设置可以参照ElfBoard官方文档:《01-1 ELF1、ELF1S开发板_软件教程_V1》第5章-5.4小节。
(2)工程文件中所需要的服务器参数查找
查看服务器地址
查看DeviceName、DeviceSecret
查看MQTT连接参数
5.工程文件的编译执行
(1)编译工程文件,得到可在Linux开发板上执行的Main二进制文件
book@100ask:~/nfs_rootfs/mqtt_iot$ arm-buildroot-linux-gnueabihf-gcc main.c mqtt_iot.c -o main -lpaho-mqtt3c -lpthread
MQTT的异步通信收发,依赖的库是libpaho-mqtt3a,MQTT的同步通信收发,依赖的库就是libpaho-mqtt3c。此外工程编译的时候需要链接线程的库pthread,所以编译的时候要加上-lpthread。
(2)开发板运行程序
root@ELF1:/mnt/mqtt_iot# ./main
(3)实验结果
Linux开发板将采集到的温度、湿度数据每5s上传一次阿里服务器,串口窗口显示数据发送成功字符。此外通过阿里服务器日志服务可以看到湿度、温度数据。
通过阿里服务器调试窗口给开发板发送LED1、LED2控制指令。
6、下面将贴出工程文件的代码,并介绍其思路
(1)main.c文件
//main.c
//定义线程句柄
pthread_t discon_t;
pthread_t thread_AT20Read_t;
pthread_t thread_ledctrl_t;
static int isConnected = 0;//表明客户端和服务器是断开还是连接状态(1-连接状态,-1断开状态)
static void *thread_AT20Read(void *paramater)
{
int fd = -1;
unsigned int databuf[2];
int c1,t1;
float hum,temp;
int ret = 0;
msgbuf pubMsg = {2, 0};
while(fd < 0){
fd = open(AHT20_DEV, O_RDWR);
if(fd < 0){
printf("can't open file %s\r\n", AHT20_DEV);
sleep(1);
}else{
printf("open file %s successfully\r\n", AHT20_DEV);
}
}
while(1){
ret = read(fd, databuf, sizeof(databuf));
if(ret == 0){
c1 = databuf[0]*1000/1024/1024;
t1 = databuf[1] *200*10/1024/1024-500;
hum = (float)c1/10.0;
temp = (float)t1/10.0;
//printf("hum = %0.2f temp = %0.2f \r\n",hum,temp);
pubMsg.mtext[0] = (unsigned int)(hum*100);
pubMsg.mtext[1] = (unsigned int)(temp*100);
int ret1 = msgsnd(pubmsg_d, &pubMsg.mtype, sizeof(pubMsg.mtext), IPC_NOWAIT); // 非阻塞发送
if(ret1 != 0)
{
printf("Failed to send message.\r\n");
}
}
sleep(5);
}
}
static void *thread_ledctrl(void *paramater)
{
int on=1;
int led;
int fd;
msgbuf subMsg = {1, 0};
fd = open(LED_BRIGHTNESS, O_WRONLY);
if(fd < 0)
{
perror("open device leds");
exit(1);
}
system(LED1_OFF);
system(LED2_OFF);
while(1)
{
int res = msgrcv(submsg_d, &subMsg, sizeof(subMsg.mtext), 0, 0);//阻塞
if(res < 0)
continue;
else{
if((subMsg.mtext[0] & 0x01)== 1){
system(LED1_ON);
}else{
system(LED1_OFF);
}
if((subMsg.mtext[0] & 0x02)== 0x02){
system(LED2_ON);
}else{
system(LED2_OFF);
}
}
}
}
//断开和mqtt服务器连接的线程入口函数
static void *mqtt_disconnect_t(void* argv)
{
int retval;
while(1)
{
char ch;
ch = getchar();
if(ch=='Q' || ch=='q')
{
printf("Try to exit mqtt task\n");
if(mqtt_disconnect() == EXIT_SUCCESS) break;
}
}
isConnected = -1;
pthread_exit(&retval); // 退出线程
return NULL;
}
int main(void)
{
//初始化mqtt成功建立客户端和服务器的连接后,将主动断开服务器的任务放到一个线程里面去
//成功建立客户端和服务器的连接且订阅主题后才创建断开连接的线程
if(mqtt_iot() == 0)
{
isConnected = 1;
pthread_create(&discon_t, 0, mqtt_disconnect_t, NULL);
}
//AT20 read thread
int ret = pthread_create(&thread_AT20Read_t, NULL, thread_AT20Read, NULL);
if(ret != 0)
{
printf("Failed to create AT20Read thread.\n");
return -1;
}
//led control thread
ret = pthread_create(&thread_ledctrl_t, NULL, thread_ledctrl, NULL);
if(ret != 0)
{
printf("Failed to create ledctrl thread.\n");
return -1;
}
while(1)
{
//printf("isConnected state:%d\n",isConnected);
sleep(5);
}
return 0;
}
(2)mqtt_iot.c文件
//mqtt_iot.c
volatile MQTTClient_deliveryToken deliveredtoken;
pthread_t threads[2];
sem_t discon_sem;//信号量
int pubmsg_d = -1;
int submsg_d = -1;
msgbuf subMsg = {1, 0};
msgbuf pubMsg = {2, 0};
pthread_t thread_mqtt_publish_t;
MQTTClient client; //定义一个MQTT客户端client
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//传递给MQTTClient_setCallbacks的回调函数,消息发送成功后,调用此回调函数
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
//传递给MQTT-Client_setCallbacks的回调函数 消息到达后,调用此回调函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("---------------------------------------------------------------\n");
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
printf("---------------------------------------------------------------\n");
subMsg.mtext[0] = 0;
unsigned short len = message->payloadlen;
char *buf = (char*)message->payload;
for(unsigned short i=0; i<len; i++)
{
if(buf[i] == '\0') break;
if(buf[i]<='9' && buf[i]>='0')
subMsg.mtext[0] = subMsg.mtext[0]*10 + buf[i] - '0';
}
int ret = msgsnd(submsg_d, &subMsg.mtype, sizeof(subMsg.mtext), IPC_NOWAIT); // 非阻塞发送
if(ret != 0)
{
printf("Failed to send message.\r\n");
}
MQTTClient_freeMessage(&message); // 释放消息
MQTTClient_free(topicName); // 释放主题名
return 1;
}
//传递给MQTTClient_setCallbacks的回调函数 连接异常断开后调用此回调函数
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
//实现MQTT的发布
void *mqtt_publish(void *argv)
{
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
char data[9];
int rc;
pubmsg.qos = QOS;
pubmsg.retained = 0;
while(1)
{
//接收消息(消息队列的ID,存放消息的指针,指定接收消息的大小,0-读取消息队列中第一个数据,阻塞)
int res = msgrcv(pubmsg_d, &pubMsg, sizeof(pubMsg.mtext), 0, 0);
if(res < 0) continue;
{
//printf("Publish_hum: %d\n", pubMsg.mtext[0]);
//printf("Publish_temp: %d\n", pubMsg.mtext[1]);
sprintf(data, "%d,%d", pubMsg.mtext[0],pubMsg.mtext[1]);
pubmsg.payload = data;
pubmsg.payloadlen = sizeof(data);
if((rc = MQTTClient_publishMessage(client, PUB_TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to publish message, return code %d\n", rc);
break;
}
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
}
}
if((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) //断开和服务器的连接
{
printf("Failed to disconnect, return code %d\n", rc);
}
pthread_exit(&threads[PubThread]);
return NULL;
}
//封装主动断开连接服务器的函数
int mqtt_disconnect(void)
{
int rc = EXIT_SUCCESS;
//两个参数:MQTT客户端和断开连接超时时间
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) //断开和服务器的连接
{
printf("Failed to disconnect, return code %d\n", rc);
rc = EXIT_FAILURE;
}
else
{
printf("MQTT disconnect success\n");
MQTTClient_destroy(&client);
}
return rc;
}
// mqtt建立客户端、连接服务器、订阅主题的封装入口函数
int mqtt_iot(void)
{
int rc = EXIT_SUCCESS;
//创建客户端
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
goto exit;
}
//设置回调函数(连接丢失处理回调函数,处理订阅消息的回调函数,成功发布消息后的回调函数)
if((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
goto destroy_exit;
}
conn_opts.username = USERNAME;
conn_opts.password = PASSWORD;
conn_opts.keepAliveInterval = 60;//保活周期,客户端向服务器发送心跳包的周期,单位秒
conn_opts.cleansession = 1;
//连接服务器
if((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
goto destroy_exit;
}
//订阅主题(传入客户端句柄、订阅的主题以及消息质量)
if ((rc = MQTTClient_subscribe(client, SUB_TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
goto destroy_exit;
}
//初始化信号量
if(sem_init(&discon_sem, 1, 0) != 0)
{
printf("Failed to init semaphore\n");
return -1;
}
//创建队列
pubmsg_d = msgget(0x1234, IPC_CREAT);
submsg_d = msgget(0x5678, IPC_CREAT);
if(pubmsg_d == -1 || submsg_d==-1) //返回错误码-1
{
printf("Failed to create a mqtt message, pubid:%d, subid:%d\n", pubmsg_d, submsg_d);
return -1;
}
else
{
printf("Publish message id: %d\n", pubmsg_d);
printf("Subscribe message id: %d\n", submsg_d);
}
int ret = pthread_create(&thread_mqtt_publish_t, NULL, mqtt_publish, NULL);
if(ret != 0)
{
printf("Failed to create mqtt_publish thread.\n");
return -1;
}
printf("MQTT connect success, press 'Q' or 'q' to disconnect mqtt server\n");
return 0;
destroy_exit:
MQTTClient_destroy(&client); //释放客户端的资源, 参数-同步客户端的句柄
return -1;
exit:
return -1;
}
至此,就完成了关于ELF 1开发板研发的MQTT远程温湿度监测系统介绍。希望这套实践案例能够成为各位小伙伴的宝贵参考,启迪创新思维,推进各位嵌入式爱好者在学习的道路上不断前进。