下面为你详细介绍如何基于 TCP 协议对 MQTT 进行封装,包括实现思路、代码示例及代码解释。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define SERVER_IP "127.0.0.1"
#define SERVER_PORT 1883
// 封装 MQTT CONNECT 报文
void mqtt_connect_packet(unsigned char *packet, const char *client_id) {
int index = 0;
// 固定报头,CONNECT 报文类型为 0x10
packet[index++] = 0x10;
// 剩余长度,这里简单假设为固定值,实际需要根据具体内容计算
packet[index++] = 12 + strlen(client_id);
// 协议名,MQTT 3.1.1
packet[index++] = 0x00;
packet[index++] = 0x04;
packet[index++] = 'M';
packet[index++] = 'Q';
packet[index++] = 'T';
packet[index++] = 'T';
// 协议级别
packet[index++] = 0x04;
// 连接标志
packet[index++] = 0x02;
// 保持活动时间
packet[index++] = 0x00;
packet[index++] = 0x3C;
// 客户端 ID 长度
packet[index++] = strlen(client_id);
// 客户端 ID
strcpy((char *)&packet[index], client_id);
}
int main() {
int sockfd;
struct sockaddr_in server_addr;
unsigned char packet[1024];
// 创建 TCP 套接字
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket creation error");
return -1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
// 将 IP 地址从点分十进制转换为二进制形式
if (inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr) <= 0) {
perror("Invalid address/ Address not supported");
return -1;
}
// 连接到服务器
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection Failed");
return -1;
}
// 封装 MQTT CONNECT 报文
mqtt_connect_packet(packet, "test_client");
int packet_length = 12 + strlen("test_client");
// 发送 MQTT CONNECT 报文
if (send(sockfd, packet, packet_length, 0) != packet_length) {
perror("send failed");
return -1;
}
// 接收服务器响应
unsigned char response[1024];
ssize_t recv_len = recv(sockfd, response, sizeof(response), 0);
if (recv_len > 0) {
printf("Received response from server:\n");
for (int i = 0; i < recv_len; i++) {
printf("%02X ", response[i]);
}
printf("\n");
}
// 关闭套接字
close(sockfd);
return 0;
}
- 订阅者代码(mqtt_subscriber.c)
#include <stdio.h>
#include <stdlib.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://test.mosquitto.org:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "test/topic"
#define QOS 1
#define TIMEOUT 10000L
// 连接丢失回调函数
void connlost(void *context, char *cause) {
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
// 消息到达回调函数
void message_arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
for(int i = 0; i < message->payloadlen; i++) {
putchar(*(message->payload + i));
}
putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
}
// 消息交付完成回调函数
void delivered(void *context, MQTTClient_deliveryToken dt) {
printf("Message with token value %d delivery confirmed\n", dt);
}
int main(int argc, char* argv[]) {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
// 创建 MQTT 客户端实例
MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
// 设置回调函数
MQTTClient_setCallbacks(client, NULL, connlost, message_arrived, delivered);
// 连接到 MQTT Broker
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Connected\n");
// 订阅主题
if ((rc = MQTTClient_subscribe(client, TOPIC, QOS)) != MQTTCLIENT_SUCCESS) {
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribed to topic: %s\n", TOPIC);
// 持续处理消息
while(1) {
// 可在此添加更多逻辑
}
// 断开连接
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
- 发布者代码(mqtt_publisher.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://test.mosquitto.org:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "test/topic"
#define PAYLOAD "Hello, MQTT!"
#define QOS 1
#define TIMEOUT 10000L
int main(int argc, char* argv[]) {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
// 创建 MQTT 客户端实例
MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
// 连接到 MQTT Broker
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Connected\n");
// 设置要发布的消息
pubmsg.payload = (void*)PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
// 发布消息
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
// 断开连接
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}