在VUE中使用MQTT
1、创建vue项目(使用node版本为20.12.0)
>>npm create vite@latest
Need to install the following packages:
create-vite@6.1.1
Ok to proceed? (y) y
√ Project name: ... mqtt-vue
√ Select a framework: » Vue
√ Select a variant: » JavaScript
2、进入项目目录,安装依赖
npm install
npm install element-plus --save
npm install mqtt --save
3、在main.js
import { createApp } from 'vue'
import './style.css'
import App from './App.vue'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
const app = createApp(App)
app.use(ElementPlus)
app.mount('#app')
4、在components文件目录下创建MqttDemo.vue
<script setup>
//消息质量取值数组
const qosList = [0,1,2]
</script>
<template>
<div class="mqtt-demo">
<el-card>
<h3>配置信息</h3>
<el-form label-position="top">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item prop="protocol" label="选择协议">
<el-select>
<el-option label="ws://" value="ws"></el-option>
<el-option label="wss://" value="wss"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="host" label="主机地址">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="port" label="端口号">
<el-input type="number" placeholder="8083/8084"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="clientId" label="客户端ID">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="username" label="用户名">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="password" label="密码">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="24">
<el-button type="primary">建立连接</el-button>
<el-button type="danger">断开连接</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
<el-card>
<h3>订阅主题</h3>
<el-form label-position="top">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item prop="topic" label="Topic">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="qos" label="Qos">
<el-select>
<el-option
v-for="qos in qosList"
:key="qos"
:label="qos"
:value="qos"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-button type="primary" class="sub-btn">订阅主题</el-button>
<el-button type="primary" class="sub-btn">取消订阅</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
<el-card>
<h3>发布消息</h3>
<el-form label-position="top">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item prop="topic" label="Topic">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="payload" label="Payload">
<el-input></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="qos" label="Qos">
<el-select>
<el-option
v-for="qos in qosList"
:key="qos"
:label="qos"
:value="qos"></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
</el-form>
<el-col :span="24" class="text-right">
<el-button type="primary">发布消息</el-button>
</el-col>
</el-card>
<el-card>
<h3>接收到的消息</h3>
<el-col :span="24">
<el-input
type="textarea"
:rows="3"
readonly></el-input>
</el-col>
</el-card>
</div>
</template>
5、在App.vue中
<script setup>
import MqttDemo from './components/MqttDemo.vue'
</script>
<template>
<MqttDemo />
</template>
<style scoped>
</style>
6、建立-断开连接
<script setup>
import {ref} from 'vue';
import mqtt from 'mqtt';
//消息质量取值数组
const qosList = [0,1,2];
//定义连接参数的对象
const connectionInfo = ref({
protocol:'ws',
host:'127.0.0.1',
port:'8083',
clientId:'emqx_vue_client_'+Math.random().toString().substring(2,8),
username:'user',
password:'123456'
})
//建立连接
const clientInitData = ref({
connected: false
})
const client = ref({})
const createConnection = () => {
const { protocol, host, port, clientId, ...options } = connectionInfo.value
const connectionUrl = `${protocol}://${host}:${port}/mqtt`
client.value = mqtt.connect(connectionUrl,options)
clientInitData.value.connected = true
}
//断开连接
const closeConnection = () =>{
client.value.end(false,()=>{
clientInitData.value.connected = false;
})
}
</script>
============================================================================================================
<el-card>
<h3>配置信息</h3>
<el-form label-position="top">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item prop="protocol" label="选择协议">
<el-select v-model="connectionInfo.protocol">
<el-option label="ws://" value="ws"></el-option>
<el-option label="wss://" value="wss"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="host" label="主机地址">
<el-input v-model="connectionInfo.host"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="port" label="端口号">
<el-input type="number" v-model="connectionInfo.port" placeholder="8083/8084"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="clientId" label="客户端ID">
<el-input v-model="connectionInfo.clientId"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="username" label="用户名">
<el-input v-model="connectionInfo.username"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="password" label="密码">
<el-input v-model="connectionInfo.password"></el-input>
</el-form-item>
</el-col>
<el-col :span="24">
<el-button type="primary" :disabled="clientInitData.connected" @click="createConnection">建立连接</el-button>
<el-button type="danger" :disabled="!clientInitData.connected" @click="closeConnection">断开连接</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
7、订阅主题-取消订阅
import { ElMessage } from 'element-plus'
//订阅主题对象
const subscriptionInfo = ref({
topic:'',
qos: 0
})
//接收消息对象
const receiverMessages = ref({})
const subscriptionInitData = ref({
subscription: false
})
//订阅主题
const subscriptionTopicHandler = () =>{
const { topic, qos } = subscriptionInfo.value
client.value.subscribe(topic, { qos }, (error, res) => {
if(error){
ElMessage.error("订阅主题失败")
return ;
}
subscriptionInitData.value.subscription=true
//给连接对象注册接收消息的事件
client.value.on('message',(topic, message)=>{
receiverMessages.value = topic + "------>" + message
})
ElMessage({
message: '订阅主题成功',
type: 'success',
})
})
}
//取消订阅
const unSubscriptionTopicHandler = () => {
const { topic, qos } = subscriptionInfo.value
client.value.unsubscribe(topic, { qos }, (error, res)=>{
if(error){
ElMessage.error("取消主题订阅失败")
return ;
}
subscriptionInitData.value.subscription = false;
ElMessage({
message: '取消订阅成功',
type: 'success',
})
})
}
============================================================================================================
<el-card>
<h3>订阅主题</h3>
<el-form label-position="top">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item prop="topic" label="Topic">
<el-input v-model="subscriptionInfo.topic"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="qos" label="Qos">
<el-select v-model="subscriptionInfo.qos">
<el-option
v-for="qos in qosList"
:key="qos"
:label="qos"
:value="qos"></el-option>
</el-select>
</el-form-item>
</el-col>
<el-col :span="8">
<el-button type="primary" class="sub-btn" :disabled="subscriptionInitData.subscription" @click="subscriptionTopicHandler">订阅主题</el-button>
<el-button type="primary" class="sub-btn" :disabled="!subscriptionInitData.subscription" @click="unSubscriptionTopicHandler">取消订阅</el-button>
</el-col>
</el-row>
</el-form>
</el-card>
8、发布消息
//发送消息对象
const publishInfo= ref({
topic: '',
qos: 0,
payload: ''
})
//发布消息
const doPublish = () =>{
const { topic, qos, payload } = publishInfo.value
client.value.publish(topic, payload, { qos }, (error, res) => {
if(error){
ElMessage.error("发布消息失败")
return;
}
ElMessage({
message: '发布消息成功',
type: 'success',
})
})
}
============================================================================================================
<el-card>
<h3>发布消息</h3>
<el-form label-position="top">
<el-row :gutter="20">
<el-col :span="8">
<el-form-item prop="topic" label="Topic">
<el-input v-model="publishInfo.topic"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="payload" label="Payload">
<el-input v-model="publishInfo.payload"></el-input>
</el-form-item>
</el-col>
<el-col :span="8">
<el-form-item prop="qos" label="Qos">
<el-select v-model="publishInfo.qos">
<el-option
v-for="qos in qosList"
:key="qos"
:label="qos"
:value="qos"></el-option>
</el-select>
</el-form-item>
</el-col>
</el-row>
</el-form>
<el-col :span="24" class="text-right">
<el-button type="primary" @click="doPublish">发布消息</el-button>
</el-col>
</el-card>
<el-card>
<h3>接收到的消息</h3>
<el-col :span="24">
<el-input v-model="receiverMessages"
type="textarea"
:rows="3"
readonly></el-input>
</el-col>
</el-card>
在java中使用MQTT
方式一:eclipse.paho.client.mqttv3
1、创建springboot项目,添加依赖
<!-- mqtt客户端依赖项-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
2、建立连接
public void createConnection() throws MqttException {
//定义连接相关参数
String serverURI = "tcp://127.0.0.1:1883";
String clientId = "pacho_client_123";
//创建MqttClient对象
MqttClient mqttClient = new MqttClient(serverURI, clientId, new MemoryPersistence());
//创建MqttConnectOptions对象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("user");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setCleanSession(true);
//发送建立连接的请求
mqttClient.connect(mqttConnectOptions);
//让当前方法处于阻塞状态
while (true);
}
3、发送消息
public void sendMsg() throws MqttException {
//定义连接相关参数
String serverURI = "tcp://127.0.0.1:1883";
String clientId = "pacho_client_123";
//创建MqttClient对象
MqttClient mqttClient = new MqttClient(serverURI, clientId, new MemoryPersistence());
//创建MqttConnectOptions对象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("user");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setCleanSession(true);
//发送建立连接的请求
mqttClient.connect(mqttConnectOptions);
//发送消息对象
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload("hello mqtt java client".getBytes(StandardCharsets.UTF_8));
mqttClient.publish("aa",mqttMessage); //主题,发送的消息
//关闭连接
mqttClient.disconnect();
mqttClient.close();
}
4、订阅主题、接收消息
public void receiveMsg() throws MqttException {
//定义连接相关参数
String serverURI = "tcp://127.0.0.1:1883";
String clientId = "pacho_client_123";
//创建MqttClient对象
MqttClient mqttClient = new MqttClient(serverURI, clientId, new MemoryPersistence());
//创建MqttConnectOptions对象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("user");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) { //连接丢失时执行
System.out.println("connect lose...");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { //消息接收时执行
System.out.println("topic--->"+topic);
byte[] payload = mqttMessage.getPayload();
System.out.println("msg--->"+new String(payload));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //消息接收完毕后执行
System.out.println("delivery complete...");
}
});
//发送建立连接的请求
mqttClient.connect(mqttConnectOptions);
//订阅主题
mqttClient.subscribe("aa",2);
当给主题为aa发送消息时,运行结果显示
topic--->aa
msg--->{
"msg": "aaacccc"
}
方式二:spring-integration-mqtt
1、创建springboot项目,添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>mqtt-java</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mqtt-java</name>
<description>mqtt-java</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- springboot集成中间件的基础依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- springboot集成mqtt客户端的依赖-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
</project>
2、在application.yml中添加mqtt的配置信息
spring:
mqtt:
username: user
password: 123456
url: tcp://127.0.0.1:1883
subClientId: sub_client_id_123
subTopic: java/aa
pubClientId: pub_client_id_123
3、添加读取mqtt配置信息的配置类MqttConfigurationProperties
package com.example.mqttjava;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix="spring.mqtt")
public class MqttConfigurationProperties {
private String username;
private String password;
private String url;
private String subClientId;
private String subTopic;
private String pubClientId;
}
4、在启动类MqttJavaApplication中添加读取配置信息的注解
package com.example.mqttjava;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
@EnableConfigurationProperties(value = MqttConfigurationProperties.class)
public class MqttJavaApplication {
public static void main(String[] args) {
SpringApplication.run(MqttJavaApplication.class, args);
}
}
5、配置MQTT客户端连接工厂类MqttConfiguration
package com.example.mqttjava;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
public class MqttConfiguration {
@Autowired
private MqttConfigurationProperties mqttConfigurationProperties;
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(){
DefaultMqttPahoClientFactory mqttPahoClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(mqttConfigurationProperties.getUsername());
options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
mqttPahoClientFactory.setConnectionOptions(options);
return mqttPahoClientFactory;
}
}
6、MQTT消息入站处理类MqttInboundConfiguration
首先创建一个消息通道,设置一个MQTT入栈适配器来订阅特定的主题并接收消息,然后将这些消息发送到消息通道,最后由指定的消息处理器来处理这些消息
package com.example.mqttjava;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttInboundConfiguration {
@Autowired
private MqttConfigurationProperties mqttConfigurationProperties;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
@Autowired
private ReceiverMessageHandler receiverMessageHandler;
//消息通道
@Bean
public MessageChannel messageInboundChannel(){
return new DirectChannel();
}
//配置入站适配器:设置要订阅的主题,指定消息相关属性
@Bean
public MessageProducer messageProducer(){
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(
mqttConfigurationProperties.getUrl(),
mqttConfigurationProperties.getSubClientId(),
mqttPahoClientFactory,
mqttConfigurationProperties.getSubTopic().split(",")
);
mqttPahoMessageDrivenChannelAdapter.setQos(1);
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());
return mqttPahoMessageDrivenChannelAdapter;
}
@Bean
@ServiceActivator(inputChannel = "messageInboundChannel")
public MessageHandler messageHandler(){
return receiverMessageHandler;
}
}
7、接收MQTT消息处理器ReceiverMessageHandler
package com.example.mqttjava;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
public class ReceiverMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
System.out.println("payload: "+payload);
System.out.println("message: "+message);
MessageHeaders headers = message.getHeaders();
String topicName = headers.get("mqtt_receivedTopic").toString();
System.out.println("topicName: "+topicName);
}
}
输出:
payload: {
"msg": "aaacccc"
}
message: GenericMessage [payload={
"msg": "aaacccc"
}, headers={mqtt_receivedRetained=true, mqtt_id=0, mqtt_duplicate=false, id=873967f5-6a6f-c092-f276-62ed1b1d3c18, mqtt_receivedTopic=java/aa, mqtt_receivedQos=0, timestamp=1738572747948}]
topicName: java/aa
8、MQTT消息出站处理类MqttOutboundConfiguration
-
@ServiceActivator(inputChannel = "mqttOutboundChannel")
:这个注解将mqttOutBoundMessageHandler
标记为一个服务激活器,并将其与mqttOutboundChannel
消息通道关联起来。这意味着,当消息被发送到mqttOutboundChannel
时,mqttOutBoundMessageHandler
将负责处理这些消息(即将它们发送到MQTT代理)。
package com.example.mqttjava;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class MqttOutboundConfiguration {
@Autowired
private MqttConfigurationProperties mqttConfigurationProperties;
@Autowired
private MqttPahoClientFactory mqttPahoClientFactory;
//配置消息通道
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
//配置出站消息处理器
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutBoundMessageHandler(){
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(
mqttConfigurationProperties.getUrl(),
mqttConfigurationProperties.getPubClientId(),
mqttPahoClientFactory
);
mqttPahoMessageHandler.setDefaultQos(0);
mqttPahoMessageHandler.setDefaultTopic("default");
mqttPahoMessageHandler.setAsync(true);
return mqttPahoMessageHandler;
}
}
9、MQTT网关接口MqttGateway,用于向MQTT发送消息
package com.example.mqttjava;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);
public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos, String payload);
}
10、向MQTT发送消息的服务类MqttMessageSender
package com.example.mqttjava;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqttMessageSender {
@Autowired
private MqttGateway mqttGateway;
public void sendMsg(String topic, String msg){
mqttGateway.sendMsgToMqtt(topic,msg);
}
public void sendMsg(String topic, int qos, String msg){
mqttGateway.sendMsgToMqtt(topic, qos, msg);
}
}
11、发送消息测试
package com.example.mqttjava;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = MqttJavaApplication.class)
public class MqttTest {
@Autowired
private MqttMessageSender mqttMessageSender;
@Test
public void sendToMsg(){
mqttMessageSender.sendMsg("java/aa","hello");
}
}
payload: hello
message: GenericMessage [payload=hello, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=35e8e42b-cc22-f4df-8ba1-b184616ded03, mqtt_receivedTopic=java/aa, mqtt_receivedQos=0, timestamp=1738572748073}]
topicName: java/aa