若依前后分离版框架下Springboot java引入Mqtt接受发送消息

**这只是其中一种而且是粗浅的接、发消息。
同步机制还要跟搞物联网的同事沟通确认去看看能不能实现 或者是设备比较多的情况下 不会去使用同步机制
首先pom文件 引入依赖
**

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

其次配置文件mqtt配置,我这是yml,其他配置文件写法需要改动下

mqtt:
    username: ****** # 用户名
    password: ****** # 密码
    hostUrl: tcp://******:1883 # tcp://ip:端口
    clientId: clientId # 客户端id
    defaultTopic: electric/#,test # 订阅主题  electric/#表示以electric/开头的主题都可以接受到
    timeout: 100 # 超时时间 (单位:秒)
    keepalive: 60 # 心跳 (单位:秒)
    enabled: true # 是否使用mqtt功能

**接下来到了代码层面了
先创建一个yml文件的实体类 MqttConfig
prefix = 这里地址看你自己的配置
@ConfigurationProperties(prefix = “mqtt”)
**



import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttConfig {
    @Autowired
    private MqttPushClient mqttPushClient;

    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 连接地址
     */
    private String hostUrl;
    /**
     * 客户Id
     */
    private String clientId;
    /**
     * 默认连接话题
     */
    private String defaultTopic;
    /**
     * 超时时间
     */
    private int timeout;
    /**
     * 保持连接数
     */
    private int keepalive;
    /**
     * mqtt功能使能
     */
    private boolean enabled;
    private boolean retained;
    /**
     * qos
     */
    private int qos;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getHostUrl() {
        return hostUrl;
    }

    public void setHostUrl(String hostUrl) {
        this.hostUrl = hostUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getDefaultTopic() {
        return defaultTopic;
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getKeepalive() {
        return keepalive;
    }

    public void setKeepalive(int keepalive) {
        this.keepalive = keepalive;
    }

    public boolean isEnabled() {
        return enabled;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }
    public int getQos() {
        return qos;
    }

    public void setQos(int qos) {
        this.qos = qos;
    }



    @Bean
    public MqttPushClient getMqttPushClient() {
        if(enabled == true){
            String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
            mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接
            for(int i=0; i<mqtt_topic.length; i++){
                mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
            }
        }
        return mqttPushClient;
    }
}

**这里在创建 MqttPushClient 文件
去链接客户端、发消息、订阅主题 功能都在这里
**

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布消息
     *
     * @param pubTopic 主题
     * @param message 内容
     * @param qos   连接方式
     */
    public  static void publishMessage(String pubTopic, String message, int qos) {
            System.out.println("发布消息   "+client.isConnected());
            System.out.println("id:"+client.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = client.getTopic(pubTopic);

            if(null != topic) {
                try {
                    MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if(!publish.isComplete()) {
                        logger.info("发布消息成功");
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public static void subscribe(String topic, int qos) {
        logger.info("开始订阅主题" + topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}

**再创建一个继承回调方法的接口 PushCallback
**

package com.ruoyi.util.mqttUtil;

import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    private static String _topic;
    private static String _qos;
    private static String _msg;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,一般在这里面进行重连
        logger.info("连接断开,可以做重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题 : " + topic);
        logger.info("接收消息Qos : " + mqttMessage.getQos());
        logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));

        _topic = topic;
        _qos = mqttMessage.getQos()+"";
        _msg = new String(mqttMessage.getPayload());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("发布消息成功");
        //发布消息成功之后 才会调用这里 大家可以仔细看看token里面 后续同步机制也是利用这个token去完成
        logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }

    //别的Controller层会调用这个方法来  获取  接收到的硬件数据
    public String receive() {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topic", _topic);
        jsonObject.put("qos", _qos);
        jsonObject.put("msg", _msg);
        return jsonObject.toString();
    }

}

到这就需要去下载个 MQTTX 跟服务器直接互相接发消息了

下图红框内的随意填写 服务器地址、端口、用户名、密码使用java代码配置文件里面的
在这里插入图片描述

在这里插入图片描述

往下就是连接上 添加一个订阅,记得 这个订阅要在你在配置文件里面哦 什么名字都ok /#相当于模糊查询
在这里插入图片描述
好了 这里可以启动项目了 控制台会打印咱们订阅的主题的,也就是说这些主题给咱们发消息 会直接被咱们接受的
在这里插入图片描述

启动项目 由于咱们的配置文件里订阅了test这个主题 我在mqttx里面直接给 test这个主题发送信息

框住的地方是什么就是给那个主题发消息

控制台自动打印 订阅的test主题信息
在这里插入图片描述
**到这里的话 接受消息就完事了 就要搞下发消息了
随便找个controller弄个请求搞一下 **

    @RequestMapping("/send")
    @ResponseBody
    private ResponseEntity<String> send() throws MqttException {
       System.out.println("我是springboot发送的数据");
       //三个参数 第一个是什么主题,第二个发送内容,第三个是
        MqttPushClient.publishMessage("clientId1","-===============",1);
        return new ResponseEntity<>("OK", HttpStatus.OK);
    }

在这里插入图片描述

在这里插入图片描述

已分享完毕,只是很基础的应用 另过几天如果项目有需求会在这继续完善同步mqtt请求的后续 如果接受不到消息 一定要看看订阅的主题对应起来没

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/146493.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

​TechSmith Camtasia 2024破解版功能介绍及使用教程

在现在的网络互联网时代&#xff0c;越来越多的人走上了自媒体的道路。有些自媒体人会自己在网络上录制精彩视频&#xff0c;也有一些人会将精彩、热门的电影剪辑出来再加上自己给它的配音&#xff0c;做成大家喜欢看的电影剪辑片段。相信不管大家是自己平时有独特的爱好也好、…

如何将微软 Office 宏转换为 ONLYOFFICE 宏

想要将微软 Office VBA 宏转换为可在 ONLYOFFICE 中无缝使用的宏&#xff1f;嗯&#xff0c;虽然这种需求并没有直接的解决方案&#xff0c;不过我们也会在本文中介绍 VBA 宏的转换步骤——正好我们手上也有一个来自用户的实际案例可供参考。 VBA 宏 以下是原始的 VBA 宏代码&…

ubuntu18.04配置Java环境与安装RCS库

一、安装包 安装包 二、JAVA环境 java无需安装&#xff0c;只需要下载解压&#xff0c;然后配置正确的路径到环境变量种即可使用。 1.创建文件JAVA mkdir JAVA 2.将安装包复制到该文件夹下&#xff0c;并解压缩 tar -zxvf tar -zxvf jdk1.8.0_191.tar.gz 3.在home路径下…

Excel-lookup函数核对两个表格的数据匹配

需求描述&#xff1a;把右侧表格里的成绩按照姓名匹配到左表中 D11函数为LOOKUP(1,0/($H$11:$H$26A11),I$11:I$26) 然后下拉赋值公式&#xff0c;那么得到的值就都是对应的

STM32中使用看门狗实现系统自动复位

STM32中的看门狗(Watchdog)是一种用于监控系统运行状态并在系统故障或死锁时执行自动复位的硬件功能。在本文中&#xff0c;我将介绍如何在STM32微控制器中使用看门狗来实现系统的自动复位。下面是详细的解释&#xff1a; 一、看门狗原理简介 看门狗是一种独立的硬件计时器&am…

竞赛选题 深度学习的水果识别 opencv python

文章目录 0 前言2 开发简介3 识别原理3.1 传统图像识别原理3.2 深度学习水果识别 4 数据集5 部分关键代码5.1 处理训练集的数据结构5.2 模型网络结构5.3 训练模型 6 识别效果7 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 深度学习…

HTTP/2.0协议详解

前言 HTTP/2.0&#xff1a;互联网通信的革新标准 随着互联网技术的飞速发展&#xff0c;HTTP协议作为互联网应用最广泛的通信协议&#xff0c;也在不断演进和优化。HTTP/2.0是HTTP协议的最新版本&#xff0c;它旨在提供更高效、更安全、更快速的互联网连接。 一、HTTP/2.0的…

解密图像处理中的利器——直方图与均衡化

直方图与均衡化是数字图像处理中常用的重要工具&#xff0c;它们能够帮助我们更好地理解和改善图像的亮度分布。本文将首先介绍直方图的基本概念以及其在图像处理中的意义&#xff0c;接着详细阐述直方图均衡化的原理和算法。同时&#xff0c;文章将探讨直方图均衡化在图像增强…

EasyExcel入门使用教程

文章目录 简介一、工程创建&#x1f391;二、读操作&#x1f38a;二、写操作&#x1f384;总结 简介 数据导入导出意义 后台管理系统是管理、处理企业业务数据的重要工具&#xff0c;在这样的系统中&#xff0c;数据的导入和导出功能是非常重要的&#xff0c;其主要意义包括以下…

java轮播图接口实现

一. 内容简介 实现java后端用户管理接口&#xff0c;数据库使用msyql。 二. 软件环境 2.1 java 1.8 2.2 mysql Ver 8.0.13 for Win64 on x86_64 (MySQL Community Server - GPL) 2.3 IDEA ULTIMATE 2019.3 2.4d代码地址 https://gitee.com/JJW_1601897441/competitionAs…

k8s_base

应用程序在服务器上部署方式的演变,互联网发展到现在为止 应用程序在服务器上部署方式 历经了3个时代1. 传统部署 优点简单 缺点就是操作系统的资源是有限制的&#xff0c;比如说操作系统的磁盘&#xff0c;内存 比如说我8G&#xff0c;部署了3个应用程序&#xff0c;当有一天…

实现Vue3 readonly,教你如何一步步重构

本文通过实现readonly方法&#xff0c;一步步展示重构的流程。 前言 readonly接受一个对象&#xff0c;返回一个原值的只读代理。 实现 Vue3 中readonly方法&#xff0c;先来看一下它的使用。 <script setup> import { readonly } from "vue";let user {n…

Spring Security OAuth2.0 实现分布式系统的认证和授权

Spring Security OAuth2.0 实现分布式系统的认证和授权 1. 基本概念1.1 什么是认证&#xff1f;1.2 什么是会话&#xff1f;1.2.1 基于 session 的认证方式1.2.2 基于 token 的认证方式 1.3 什么是授权&#xff1f;1.3.1 授权的数据模型 1.4 RBAC 介绍 2. Spring Security2.1 S…

Spring-Spring之AOP底层原理解析---实践(动态代理)

动态代理 代理模式的解释&#xff1a;为其他对象提供一种代理以控制对这个对象的访问&#xff0c;增强一个类中的某个方法&#xff0c;对程序进行扩展。 cglib动态代理 方式一&#xff1a; public class UserService {public void test() {System.out.println("test..…

hadoop 大数据环境配置 配置jdk, hadoop环境变量 配置centos环境变量 hadoop(五)

1. 遗漏一步配置系统环境变量&#xff0c;下面是步骤&#xff0c;别忘输入更新系统环境命令 2. 将下载好得压缩包上传至服务器&#xff1a; /opt/module 解压缩文件存放地址 /opt/software 压缩包地址 3. 配置环境变量&#xff1a; 在/etc/profile.d 文件夹下创建shell文件 …

【Nginx】CentOS 安装Nignx

CentOS上安装Nginx&#xff1a; 1. 打开终端&#xff1a;使用SSH或者直接在服务器上打开终端。 2. 更新系统&#xff1a;运行以下命令以确保您的系统软件包列表是最新的&#xff1a; sudo yum update3. 安装Nginx&#xff1a;运行以下命令以安装Nginx&#xff1a; sudo yum…

性能测试 —— Jmeter分布式测试的注意事项和常见问题

Jmeter是一款开源的性能测试工具&#xff0c;使用Jmeter进行分布式测试时&#xff0c;也需要注意一些细节和问题&#xff0c;否则可能会影响测试结果的准确性和可靠性。 Jmeter分布式测试时需要特别注意的几个方面 1. 参数化文件的位置和内容 如果使用csv文件进行参数化&…

人工智能基础_机器学习030_ElasticNet弹性网络_弹性回归的使用---人工智能工作笔记0070

然后我们再来看elastic-net弹性网络,之所以叫弹性是因为,他融合了L1和L2正则,可以看到 他的公式 公式中有L1正则和L2正则两个都在这个公式中 可以看到弹性网络,在很多特征互相联系的时候,非常有用,比如, 相关性,如果数学好,那么物理也好,如果语文好,那么英语也好 这种联系 正…

[论文分享] Never Mind the Malware, Here’s the Stegomalware

Never Mind the Malware, Here’s the Stegomalware [IEEE Security & Privacy 2022] Luca Caviglione | National Research Council of Italy Wojciech Mazurczyk | Warsaw University of Technology and FernUniversitt in Hagen 近年来&#xff0c;隐写技术已逐渐被观…

处理本地DNS劫持,导致域名解析失败

回顾情形 在公司内网&#xff0c;有同事反馈appstoreconnect.apple.com网站需要通过代理才能正常访问。 向我这边询问&#xff0c;否修改过路由策略&#xff1b; 检测域名 通过域名ping检测网址&#xff0c;https://ping.chinaz.com/astrill.com&#xff0c;来ping域名apps…