SpringBoot+RabbitMQ实现MQTT协议通讯

一、简介

MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。此处使用RabbitMQ
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

二、环境准备

2.1 Erlang安装

使用rabbitMQ首先需要安装Erlang环境,因为rabbitMQ是用Erlang语言编写的。

2.1.1 下载安装

官网下载:https://www.erlang.org/patches/otp-26.0 (比较慢,不推荐)
在这里插入图片描述

百度网盘下载:https://pan.baidu.com/s/1xU4syn14Bh7QR-skjm_hOg (推荐)
提取码:az1t

2.1.2 环境变量

进入高级系统设置
在这里插入图片描述
在这里插入图片描述
环境变量: 变量名-ERLANG_HOME 变量值-文件安装路径
在这里插入图片描述
配置path: 配置完上面的之后,找到系统变量中的path点击编辑,然后新建:%ERLANG_HOME%\bin
在这里插入图片描述
验证: 进入cmd,输入 erl -version 显示版本号就说明安装成功
在这里插入图片描述

2.2 RabbtiMQ安装

2.2.1 下载安装

官网下载:http://www.rabbitmq.com/download.html
下载后一通傻瓜式安装即可。
在这里插入图片描述

2.2.2 环境变量

变量名-RABBITMQ_SERVER 变量值-文件安装路径
在这里插入图片描述
编辑path,点击新建按钮,输入%RABBITMQ_SERVER%\sbin,点击确定
在这里插入图片描述

2.2.3 安装mqtt插件

rabbitmq-plugins enable rabbitmq_mqtt

在这里插入图片描述

2.2.4 管理控制台安装

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

2.2.5 访问测试

登录测试: 浏览器输入 http://localhost:15672 ,输入用户名:guest,密码:guest在这里插入图片描述

三、代码实现

3.1 引入依赖

<!-- rabbitmq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!--mqtt依赖包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

3.2 yml配置

# mqtt配置
mqtt:
  url: ***********
  username: ***********
  password: ***********
  # 间隔时间
  keep-alive-interval: 60
  # 超时时间
  completion-timeout: 30000
  # 会话保持,默认为false
  clean-session: false
  # 自动连接,默认为true
  automatic-reconnect: true
  # 生产者配置
  producer:
    # 很重要
    client-id: producer1
    topic: demo-topic
    # 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次
    qos: 1
  # 消费者配置
  subscriber:
    # 很重要
    client-id: subscriber1
    topic: demo-topic
    # 传输质量 QoS 0:最多分发一次 QoS 1:至少分发一次(默认) QoS 2:只分发一次
    qos: 0

3.3 配置属性类

package com.qiangesoft.mqtt.config;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.stereotype.Component;

/**
 * mqtt配置
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Data
@Component
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperty {

    /**
     * 服务地址
     */
    private String url;

    /**
     * 账号
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 间隔时间
     */
    private int keepAliveInterval;

    /**
     * 超时时间
     */
    private int completionTimeout;

    /**
     * 会话保持,默认为false
     */
    private boolean cleanSession;

    /**
     * 自动连接,默认为true
     */
    private boolean automaticReconnect = true;

    /**
     * 生产者
     */
    private Client producer = new Client();

    /**
     * 消费者
     */
    private Client subscriber = new Client();

    @Data
    public class Client {
        /**
         * 客户端id
         */
        private String clientId;

        /**
         * 默认主题
         */
        private String topic;

        /**
         * 传输质量
         * QoS 0:最多分发一次
         * QoS 1:至少分发一次(默认)
         * QoS 2:只分发一次
         */
        private int qos = 1;
    }

    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        // 连接参数
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{url});
        if (StringUtils.isNotBlank(this.username)) {
            options.setUserName(this.username);
        }
        if (StringUtils.isNotBlank(this.password)) {
            options.setPassword(this.password.toCharArray());
        }
        // 心跳时间
        options.setKeepAliveInterval(this.keepAliveInterval);
        // 断开是否自动重联
        options.setAutomaticReconnect(this.automaticReconnect);
        // 保持session,客户端上线后会接受到它离线的这段时间的消息
        options.setCleanSession(this.cleanSession);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
//        options.setWill("willTopic", WILL_DATA, 2, false);
        factory.setConnectionOptions(options);
        return factory;
    }
}
package com.qiangesoft.mqtt.constant;

/**
 * mqtt通用常量信息
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
public class MqttConstant {

    /**
     * 生产者管道
     */
    public static final String OUTBOUND_CHANNEL = "outboundChannel";

    /**
     * 消费者管道
     */
    public static final String INBOUND_CHANNEL = "inboundChannel";
}

3.4 生产者

package com.qiangesoft.mqtt.producer;

import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * mqtt生产者
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Configuration
public class MqttProducerConfig {

    @Autowired
    private MqttProperty mqttProperty;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;

    /**
     * 消息生产通道
     *
     * @return
     */
    @Bean(name = MqttConstant.OUTBOUND_CHANNEL)
    public MessageChannel outboundChannel() {
        return new DirectChannel();
    }

    /**
     * 消息发布
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = MqttConstant.OUTBOUND_CHANNEL)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperty.getProducer().getClientId(), mqttPahoClientFactory);
        messageHandler.setAsync(false);
        messageHandler.setDefaultQos(mqttProperty.getProducer().getQos());
        messageHandler.setDefaultTopic(mqttProperty.getProducer().getTopic());
        return messageHandler;
    }
}

3.5 消费者

package com.qiangesoft.mqtt.subscriber;

import com.qiangesoft.mqtt.config.MqttProperty;
import com.qiangesoft.mqtt.constant.MqttConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import java.util.Objects;

/**
 * mqtt消费者
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Slf4j
@Configuration
public class MqttSubscriberConfig {

    @Autowired
    private MqttProperty mqttProperty;
    @Autowired
    private MqttPahoClientFactory mqttPahoClientFactory;

    /**
     * 消息订阅通道
     *
     * @return
     */
    @Bean(name = MqttConstant.INBOUND_CHANNEL)
    public MessageChannel inboundChannel() {
        return new DirectChannel();
    }

    /**
     * 消息订阅通道绑定
     *
     * @return
     */
    @Bean
    public MessageProducer mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperty.getSubscriber().getClientId(),
                mqttPahoClientFactory, mqttProperty.getSubscriber().getTopic());
        adapter.setCompletionTimeout(mqttProperty.getCompletionTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(mqttProperty.getSubscriber().getQos());
        adapter.setOutputChannel(inboundChannel());
        return adapter;
    }

    /**
     * 消息订阅
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = MqttConstant.INBOUND_CHANNEL)
    public MessageHandler messageHandler() {
        return message -> {
            try {
                String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
                log.info("订阅主题为: {}", topic);
                String payload = message.getPayload().toString();
                log.info("订阅接收到消息:{}", payload);
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    }

}

3.6 消息发送网关

package com.qiangesoft.mqtt.service;

import com.qiangesoft.mqtt.constant.MqttConstant;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * 消息发送网关
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConstant.OUTBOUND_CHANNEL)
public interface MqttGateway {

    /**
     * 发送到mqtt
     *
     * @param payload 消息内容
     */
    void sendToMqtt(String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param qos     qos
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

3.7 发送测试

package com.qiangesoft.mqtt.controller;

import com.qiangesoft.mqtt.service.MqttGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 控制器
 *
 * @author qiangesoft
 * @date 2024-04-24
 */
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttGateway mqttGateway;

    @GetMapping("/send")
    public String send(String message) {
        mqttGateway.sendToMqtt(message);
        return "success";
    }
}

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/574623.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构之堆

片头 嗨! 小伙伴们,上一篇中,我们学习了队列相关知识,今天我们来学习堆这种数据结构,准备好了吗? 我们开始咯 ! 一、堆 1.1 堆的概念 堆&#xff08;Heap&#xff09;是一种特殊的树,如果将一个集合中的所有元素按照完全二叉树的顺序存储方式存储在一个一维数组中,并满足一定…

物联网:从电信物联开发平台AIoT获取物联设备上报数据示例

设备接入到电信AIoT物联平台后&#xff0c;可以在平台上查询到设备上报的数据。 下面就以接入的NBIOT物联远传水表为例。 在产品中选择指定设备&#xff0c;在数据查看中可以看到此设备上报的数据。 示例中这组数据是base64位加密的&#xff0c;获取后还需要转换解密。 而我…

Oceanbase体验之(一)运维管理工具OCP部署(社区版4.2.2)

资源规划建议 ocp主机1台 内存:64G CPU1:2C及以上 硬盘大于500G observer服务器3台 内存32G CPU&#xff1a;4C以上 硬盘大于1T 建议存储硬盘与操作系统硬盘隔开实现IO隔离 一、OBD、OCP安装包准备 [rootobserver /]# chown -R admin:admin /software/ [rootobserver /]# …

【ensp实验】Telnet 协议

目录 Telnet 协议 telnet协议特点 Telnet实验 ​编辑 不使用console口 三种认证模式的区别 Telnet 协议 Telnet 协议是 TCP/IP 协议族中的一员&#xff0c;是 Internet 远程登录服务的标准协议和主要方式。它为用户提供了在本地计算机上完成远程主机工作的能力。在终端使用…

【Leetcode每日一题】 穷举vs暴搜vs深搜vs回溯vs剪枝_全排列 - 子集(难度⭐⭐)(65)

1. 题目解析 题目链接&#xff1a;78. 子集 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 算法思路详解&#xff1a; 为了生成数组 nums 的所有子集&#xff0c;我们需要对数组中的每个元素进行“选择”或“不选择…

三星电脑文件夹误删了怎么办?恢复方案在此

在使用三星电脑的过程中&#xff0c;我们可能会不小心删除了某个重要的文件夹&#xff0c;其中可能包含了工作文件、家庭照片、视频或其他珍贵的数据。面对这种突发情况&#xff0c;不必过于焦虑。本文将为您提供几种有效的恢复方案&#xff0c;希望能帮助您找回误删的文件夹及…

微软ML Copilot框架释放机器学习能力

摘要&#xff1a;大模型席卷而来&#xff0c;通过大量算法模型训练推理&#xff0c;能根据人类输入指令产生图文&#xff0c;其背后是大量深度神经网络模型在做运算&#xff0c;这一过程称之为机器学习&#xff0c;本文从微软语言大模型出发&#xff0c;详解利用大型语言模型&a…

【鸿蒙应用】理财App

目录 第一节项目讲解项目介绍 第二节&#xff1a;项目创建登录静态框架编写登录页面设稿新建项目控制台添加项目Login页面封装标题组件 第三节&#xff1a;登录页静态表单编写第四节—内容页架构分析底部栏组件第五节—底部栏组件切换第六节&#xff1a;首页静态页编写第七节&a…

【中级软件设计师】上午题12-软件工程(2):单元测试、黑盒测试、白盒测试、软件运行与维护

【中级软件设计师】上午题12-软件工程&#xff08;2&#xff09; 1 系统测试1.1 单元测试1.2 集成测试1.2.1 自顶向下1.2.2 自顶向上1.2.3 回归测试 2 测试方法2.1 黑盒测试2.1.1 McCabe度量法 2.2 白盒测试2.2.1 语句覆盖-“每个流程”执行一次2.2.2 判定覆盖2.2.3 条件覆盖-A…

BGP的基本概念和工作原理

AS的由来 l Autonomous System 自治系统&#xff0c;为了便于管理规模不断扩大的网络&#xff0c;将网络划分为不同的AS l 不同AS通过AS号区分&#xff0c;AS号取值范围1&#xff0d;65535&#xff0c;其中64512&#xff0d;65535是私有AS号 l IANA机构负责AS号的分发 AS之…

Ubuntu关闭防火墙、关闭selinux、关闭swap

关闭防火墙 打开终端&#xff0c;然后输入如下命令&#xff0c;查看防火墙状态&#xff1a; sudo ufw status 开启防火墙命令如下&#xff1a; sudo ufw enable 关闭防火墙命令如下&#xff1a; sudo ufw disable 关闭selinux setenforce 0 && sed -i s/SELINUXe…

Android kotlin 协程异步async与await介绍与使用

一、介绍 在kotlin语言中&#xff0c;协程是一个处理耗时的操作&#xff0c;但是很多人都知道同步和异步&#xff0c;但是不知道该如何正确的使用&#xff0c;如果处理不好&#xff0c;看似异步&#xff0c;其实在runBloacking模块中使用的结果是同步的。 针对如何同步和如何异…

鸿蒙应用ArkTS开发- 选择图片、文件和拍照功能实现

前言 在使用App的时候&#xff0c;我们经常会在一些社交软件中聊天时发一些图片或者文件之类的多媒体文件&#xff0c;那在鸿蒙原生应用中&#xff0c;我们怎么开发这样的功能呢&#xff1f; 本文会给大家对这个功能点进行讲解&#xff0c;我们采用的是拉起系统组件来进行图片…

03-JAVA设计模式-备忘录模式

备忘录模式 什么是备忘录模式 Java中的备忘录模式&#xff08;Memento Pattern&#xff09;是一种行为型设计模式&#xff0c;它允许在不破坏封装性的前提下捕获一个对象的内部状态&#xff0c;并在该对象之外保存这个状态&#xff0c;以便以后可以将对象恢复到原先保存的状态…

Ansible自动化

Ansible自动化 自动化的需求&#xff1a; 1. 在什么样的场景下需要自动化&#xff1f; 批量化的工作&#xff1a; 装软件包、配置服务、升级、下发文件… 2. 为什么在自动化工具中选择ansible&#xff1f; 对比shell脚本&#xff1a; 相对于用shell的脚本来实现自动化&#x…

18.Nacos配置管理-微服务读取Nacos中的配置

需要解决的问题 1.实现配置更改热更新&#xff0c;而不是改动了配置文件还要去重启服务才能生效。 2.对多个微服务的配置文件统一集中管理。而不是需要对每个微服务逐一去修改配置文件&#xff0c;特别是公共通用的配置。 配置管理服务中的配置发生改变后&#xff0c;回去立…

主成分分析(PCA):揭秘数据的隐藏结构

在数据分析的世界里&#xff0c;我们经常面临着处理高维数据的挑战。随着维度的增加&#xff0c;数据处理、可视化以及解释的难度也随之增加&#xff0c;这就是所谓的“维度的诅咒”。主成分分析&#xff08;PCA&#xff09;是一种强大的统计工具&#xff0c;用于减少数据的维度…

python爬虫插件XPath的安装

概要 XPath Helper是一款专用于chrome内核浏览器的实用型爬虫网页解析工具。XPath可以轻松快捷地找到目标信息对应的Xpath节点&#xff0c;获取xpath规则&#xff0c;并提取目标信息&#xff0c;并进行校对测试&#xff1b;可对查询出的xpath进行编辑&#xff0c;正确编辑的结…

计算机网络和因特网

Internet: 主机/端系统&#xff08;end System / host&#xff09;&#xff1a; 硬件 操作系统 网络应用程序 通信链路&#xff1a; 光纤、网络电缆、无线电、卫星 传输效率&#xff1a;带宽&#xff08;bps&#xff09; 分组交换设备&#xff1a;转达分组 包括&#…

DAP-seq助力揭示转录因子在草地贪夜蛾Bt抗性中重要作用

2024年4月6日&#xff0c;武汉生物工程学院生命科学与技术学院刘磊磊课题组在International Journal of Biological Macromolecules&#xff08;中科院一区&#xff0c;影响因子8.2&#xff09;期刊在线发表了“Contribution of the transcription factor SfGATAe to Bt Cry to…