Linux(centos)下 Mosquitto MQTT 代理的安装与配置
MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议,广泛应用于物联网(IoT)领域。Mosquitto 是一个开源的 MQTT 代理,它支持 MQTT 协议 3.1 和 3.1.1,适用于各种设备和平台。
在工业上使用 MQTT 协议来进行物联网数据传输,主要看中了以下优点:
- 低协议开销:它的每消息标题可以短至 2 个字节,容错性好
- 物联网的网络环境往往比较恶劣,MQTT能从断开故障中恢复,并且不需要额外的编码
- 低功耗:MQTT专门为了低功耗的目标而设计
- 最多能接受百万级别的客户端
安装 Mosquitto
安装
选择需要安装的版本,我这里安装的是 2.0.0 版。
官网下载:https://mosquitto.org/files/source/
# 下载
wget --no-check-certificate https://mosquitto.org/files/source/mosquitto-2.0.0.tar.gz
# 安装
tar -zxvf mosquitto-2.0.0.tar.gz
cd mosquitto-2.0.0
make
make install
如果 make 时报如下错误
In file included from mosquitto_ctrl.c:29:
mosquitto_ctrl.h:21:10: fatal error: cJSON.h: No such file or directory
21 | #include <cJSON.h>
| ^~~~~~~~~
compilation terminated.
make[2]: *** [Makefile:49: mosquitto_ctrl.o] Error 1
make[2]: Leaving directory '/usr/local/mosquitto-2.0.0/apps/mosquitto_ctrl'
make[1]: *** [Makefile:9: all] Error 2
make[1]: Leaving directory '/usr/local/mosquitto-2.0.0/apps'
make: *** [Makefile:64: mosquitto] Error 2
出现 cJSON 找不到,需要安装 cJSON(cJSON-1.7.18.tar.gz):
下载地址:https://github.com/DaveGamble/cJSON/releases
# 安装
tar -zxvf cJSON-1.7.18.tar.gz
cd cJSON-1.7.18
make
make install
cJSON 安装成功后再进行 make 安装即可成功。
相关配置
Mosquitto 的配置文件通常位于:/etc/mosquitto/mosquitto.conf
根据需要修改配置文件中的设置,可以更改监听地址和端口、设置持久化选项、配置用户认证等。
创建配置文件
cp /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf
监听端口
一般使用mqtt默认监听端口 1883
, 如果需要修改监听端口,需修改配置文件设置自己的端口即可(我这里使用默认的)。
配置用户认证
MQTT 默认无需用户认证,配置 MQTT 的用户名和密码通常涉及编辑 MQTT 代理的配置文件。
Mosquitto 使用 pwfile 来存储用户名和密码,可以使用 mosquitto_passwd 命令来创建和管理这个文件。
# 创建密码文件,用户名称为 mosquittoadmin
mosquitto_passwd -c /etc/mosquitto/pwfile.conf mosquittoadmin
系统会提示输入密码并再次确认,-c
选项表示创建一个新的密码文件。如果文件已经存在,则使用 -b
选项追加用户。
编辑 Mosquitto 配置文件 /etc/mosquitto/mosquitto.conf
,添加或修改以下行以启用密码文件认证:
allow_anonymous false
password_file /etc/mosquitto/pwfile
创建 mosquitto 用户
配置文件中默认使用 user mosquitto,需要创建 mosquitto 用户:
groupadd mosquitto
useradd -g mosquitto mosquitto
chown -R mosquitto:mosquitto /etc/mosquitto/
设置外网访问
# 设置外网访问
listener 1883 0.0.0.0
启动、查看、关闭程序
运行程序
mosquitto -c /etc/mosquitto/mosquitto.conf -d
查看
ps -aux | grep mosquitto
关闭程序
kill -9 $(pidof mosquitto)
SpringBoot 整合 Mosquitto
添加依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
配置 MQTT 服务器的连接信息
server:
port: 8001
mqtt:
url: tcp://127.0.0.1:1883
username: mosquittoadmin
password: mosquittoadmin@123
client-id: test-client
MQTT 配置类
package com.demo.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @path:com.demo.config.MqttClientConfig.java
* @className:MqttClientConfig.java
* @description: MQTT配置
* @dateTime:2025/1/5 10:51
* @editNote:
*/
@Configuration
public class MqttClientConfig {
@Value("${mqtt.url}")
private String url;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = new MqttClient(url, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
client.connect(options);
return client;
}
}
发布、订阅消息
package com.demo.service;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @path:com.demo.service.MqttService.java
* @className:MqttService.java
* @description: 发布、订阅消息
* @dateTime:2025/1/5 10:51
* @editNote:
*/
@Slf4j
@Service
public class MqttService {
@Autowired
private MqttClient mqttClient;
/**
* @MonthName: publish
* @Description: 发布消息
* @Date: 2025/1/5 11:26
* @Param: [topic, message]
* @return: void
**/
public void publish(String topic, String message) throws MqttException {
log.info("===发布消息====topic:{},消息内容:{}", topic, message);
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(2);
mqttClient.publish(topic, mqttMessage);
}
/**
* @MonthName: subscribe
* @Description: 订阅消息
* @Date: 2025/1/5 11:26
* @Param: [topic]
* @return: void
**/
public void subscribe(String topic) throws MqttException {
mqttClient.subscribe(topic, (t, msg) -> {
String message = new String(msg.getPayload());
log.info("===订阅消息====topic:{},消息内容:{}", topic, message);
});
}
}
** MqttController 接口**
package com.demo.controller;
import com.demo.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttService mqttService;
@GetMapping("/publish")
public String publish(@RequestParam String topic, @RequestParam String message) {
try {
mqttService.publish(topic, message);
log.info("发布消息,topic:{},消息内容:{}", topic, message);
return "success";
} catch (MqttException e) {
log.error("{}", e);
return "error";
}
}
@GetMapping("/subscribe")
public String subscribe(@RequestParam String topic) {
try {
mqttService.subscribe(topic);
log.info("订阅消息,topic:{}", topic);
return "success";
} catch (MqttException e) {
log.error("{}", e);
return "error";
}
}
}
初始化订阅消息
服务初始化启动完成后,自动订阅 Topic
package com.demo.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @path:com.demo.service.SubscribeRunner.java
* @className:SubscribeRunner.java
* @description: 初始化订阅消息
* @dateTime:2025/1/5 15:25
* @editNote:
*/
@Slf4j
@Component
@Order(10)
public class SubscribeRunner implements ApplicationRunner {
@Autowired
MqttService mqttService;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("================初始化订阅消息==================");
mqttService.subscribe("mytopic");
}
}
启动服务测试
启动服务
Connected to the target VM, address: '127.0.0.1:52517', transport: 'socket'
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.7.9)
2025-01-05 21:28:00.464 INFO 8888 --- [ main] com.demo.DemoBootApplication : Starting DemoBootApplication using Java 1.8.0_251 on DESKTOP-BBRENPM with PID 8888 (E:\workspace\demo-boot\target\classes started by Administrator in E:\workspace\demo-boot)
2025-01-05 21:28:00.466 INFO 8888 --- [ main] com.demo.DemoBootApplication : No active profile set, falling back to 1 default profile: "default"
2025-01-05 21:28:00.949 INFO 8888 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8001 (http)
自动订阅消息,可以通过请求接口手动订阅:http://localhost:8001/mqtt/subscribe?topic=mytopic
2025-01-05 21:28:01.666 INFO 8888 --- [ main] com.demo.DemoBootApplication : Started DemoBootApplication in 1.427 seconds (JVM running for 1.932)
2025-01-05 21:28:01.667 INFO 8888 --- [ main] com.demo.service.SubscribeRunner : ================初始化订阅消息==================
2025-01-05 21:28:16.428 INFO 8888 --- [nio-8001-exec-1] o.a.c.c.C.[.[localhost] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-01-05 21:28:16.428 INFO 8888 --- [nio-8001-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2025-01-05 21:28:16.428 INFO 8888 --- [nio-8001-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms
发布消息,浏览器访问接口:
http://localhost:8001/mqtt/publish?message=测试消息
2025-01-05 21:29:34.603 INFO 8888 --- [nio-8001-exec-8] com.demo.controller.MqttController : 发布消息,topic:mytopic,消息内容:测试消息
2025-01-05 21:29:34.613 INFO 8888 --- [ll: test-client] com.demo.service.MqttService : ===订阅消息====topic:mytopic,消息内容:测试消息