SpringBoot(Java)实现MQTT连接(本地Mosquitto)通讯调试

1.工作及使用背景

        工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。

        目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。

        但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。

2.开发环境及工具

JDK1.8、maven、Mosquitto、IDEA、postman

3.框架结构及文件声明

因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。

MqttClientConnectorPool对外提供一个初始化的Mqtt客户端,在服务启动时初始化
MqttMsgSender对外提供一个可以执行消息发送的方法
MqttMsgSubscriber初始化一个Mqtt客户端,并根据配置订阅topic
TestController接收web请求的调用消息发送,用于测试
BusinessApplicationStartup服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待
BusinessApplicationShutdown服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端
MqttBrokerServerSpringBoot服务启动类

4.具体实现逻辑及代码

4.1 maven依赖

<properties>
	<MQTTv3.version>1.2.5</MQTTv3.version>
</properties>

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.eclipse.paho</groupId>
			<artifactId>org.eclipse.paho.client.MQTTv3</artifactId>
			<version>${MQTTv3.version}</version>
		</dependency>
	</dependencies>
</dependencyManagement>

或者直接使用
<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.MQTTv3</artifactId>
	<version>1.2.5</version>
</dependency>

4.2 MqttClientConnectorPool

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class MqttClientConnectorPool {

    public static MqttClient mqttClient;

    /**
     * 连接MQTT客户端
     * @return 获取MQTT连队对象
     */
    public static MqttClient connectMQTT() {

        if (mqttClient != null){
            log.info("已存在,我深深的脑海!");
            return mqttClient;
        }

        try {
            // broker及连接信息
            String broker = "tcp://127.0.0.1:1883";
            String username = "admin";
            String password = "123456";
            String clientId = System.currentTimeMillis() + "";

            //创建MQTT客户端(指定broker、客户端id、消息持久策略)
            mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());

            //创建连接参数配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            //连接超时时间
            options.setKeepAliveInterval(20);
            //是否自动重连
            options.setAutomaticReconnect(true);
            mqttClient.connect(options);
            log.info("MqttClient 服务启动broker初始化!");
        } catch (MqttException e){
            log.error("MqttClient connect Error:{}", e.getMessage());
            e.printStackTrace();
        }

        return mqttClient;
    }

    /**
     * 关闭MQTT客户端
     * @param client client
     */
    public static void closeClient(MqttClient client){
        try {
            // 断开连接
            client.disconnect();
            // 关闭客户端
            client.close();
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 关闭MQTT客户端
     */
    public static void closeStaticClient(){
        try {
            if (mqttClient != null){
                // 断开连接
                mqttClient.disconnect();
                // 关闭客户端
                mqttClient.close();
            }
        } catch (MqttException e){
            log.error("MqttClient disconnect or close Error:{}", e.getMessage());
            e.printStackTrace();
        }
    }
}

4.3 MqttMsgSender

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

@Slf4j
public class MqttMsgSender {

    public void sendMessage(MqttClient client,String topic,String content,int qos){
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        try{
            client.publish(topic,message);
        } catch (MqttException e){
            log.error("MqttClient publish text info Error:{}!", e.getMessage());
            e.printStackTrace();
        }
    }
}

4.4 MqttMsgSubscriber

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Slf4j
public class MqttMsgSubscriber {

    String broker = "tcp://127.0.0.1:1883";
    String topic = "/deviceUp";
    String username = "admin";
    String password = "123456";
    String clientId = System.currentTimeMillis() + "";
    int qos = 1;
    
    public void readSubscribeTopicMessage(){
        try {
            MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());

            // 连接参数
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            //是否清除会话
            options.setCleanSession(true);
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(60);
            client.setCallback(new MqttCallback() {

                @Override
                public void connectionLost(Throwable throwable) {
                    log.error("连接丢失");
                }

                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    log.info("topic为: " + topic);
                    log.info("qos为: " + mqttMessage.getQos());
                    log.info("消息内容为: " + new String(mqttMessage.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    // 当消息被完全传送出去后调用
                    log.info("交付完成 ---Delivery complete!");
                    // 可以在这里处理一些发送完成后的清理工作
                }
            });

            client.connect(options);
            client.subscribe(topic, qos);
        } catch (MqttException e){
            log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());
        } catch (Exception e){
            log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());
        }
    }
    
}

4.5 TestController

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@Slf4j
@RestController
@RequestMapping()
public class TestController {

    @GetMapping("/test/mqtt/{msg}")
    public String testSendMqttMsg(@PathVariable("msg") String msg){
        log.info("消息内容:{}.", msg);

        MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();
        MqttMsgSender sender = new MqttMsgSender();

        String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";

        String topic = "/deviceUp";
        int qos = 1;

        if (null != mqttClient){
            sender.sendMessage(mqttClient, topic, content, qos);
        } else {
            log.info("MqttClient为空,无法发送!");
            return "失败!";
        }

        return "成功!";
    }

}

4.6 BusinessApplicationStartup

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("MqttClientConnectorPool ===================== Startup");
        MqttClientConnectorPool.connectMQTT();
        log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");

        log.info("MqttMsgSubscriber ===================== Startup");
        // 先订阅等待
        MqttMsgSubscriber subscriber = new MqttMsgSubscriber();
        subscriber.readSubscribeTopicMessage();
    }
}

4.7 BusinessApplicationShutdown

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {

    @Override
    public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
        log.info("服务终止! shutdown hook, ContextClosedEvent");
        MqttClientConnectorPool.closeStaticClient();
    }

}

4.8 MqttBrokerServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {

    public static void main(String[] args) {
        SpringApplication.run(MqttBrokerServer.class, args);
    }

}

5.其他备注

5.1 需要Mqtt(Broker)服务器

        如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过

// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";

5.2 调试地址

如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段

6.参考文章

MQTT协议介绍及Java教程

https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc

7.喜欢作者

暂无

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/885207.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Oracle数据恢复—异常断电导致Oracle数据库报错的数据恢复案例

Oracle数据库故障&#xff1a; 机房异常断电后&#xff0c;Oracle数据库启库报错&#xff1a;“system01.dbf需要更多的恢复来保持一致性&#xff0c;数据库无法打开”。数据库没有备份&#xff0c;归档日志不连续。用户方提供了Oracle数据库的在线文件&#xff0c;需要恢复zxf…

仕考网:事业单位考试选岗小技巧!

事业单位的考试选岗阶段&#xff0c;应该综合考量多个方面&#xff0c;确保选择出更合适的岗位&#xff0c;仕考网为大家分享以下技巧&#xff1a; 1. 岗位性质及工作内容 岗位性质:深入了解岗位是管理类、专业技术类还是工勤技能类&#xff0c;以及相应的职责和要求。 工作…

解决 Macos下 Orbstack docker网络问题

两种解决方法&#xff0c;第一种开代理 参考 —— 但是我这一种没成功&#xff0c;第二种方法是换镜像源 { "registry-mirrors": ["http://hub-mirror.c.163.com","https://docker.mirrors.ustc.edu.cn","https://mirrors.tencent.com&q…

openKylin--安装 .net6.0

编辑profile文件 cd .. //切换到根目录 cd /etc //切换到etc目录 vim profile //b编辑profile文件 1. 按→键移动到文件末尾 2. 按Insert键进入编辑模式 3. 按Enter另起一行开始编辑 export DOTNET_ROOT/home/dotnetexport PATH$PATH:/home/dotnet 可以通过右键--粘贴 的…

【Android 14源码分析】WMS-窗口显示-第二步:relayoutWindow -1

忽然有一天&#xff0c;我想要做一件事&#xff1a;去代码中去验证那些曾经被“灌输”的理论。                                                                                  – 服装…

YOLOv11改进策略【损失函数篇】| Slide Loss,解决简单样本和困难样本之间的不平衡问题

一、本文介绍 本文记录的是改进YOLOv11的损失函数&#xff0c;将其替换成Slide Loss&#xff0c;并详细说明了优化原因&#xff0c;注意事项等。Slide Loss函数可以有效地解决样本不平衡问题&#xff0c;为困难样本赋予更高的权重&#xff0c;使模型在训练过程中更加关注困难样…

Docker版MKVtoolnix的安装及中文显示

本文是应网友 kkkhi 要求折腾的&#xff0c;只研究了 MKVtoolnix 的安装及中文显示&#xff0c;未涉及到软件的使用&#xff1b; 什么是 MKVtoolnix &#xff1f; MKVToolnix 是一款功能强大的多媒体处理工具&#xff0c;用于在 Linux、其他 Unix 系统和 Windows 上创建、修改和…

SpringBoot--为什么Controller是串行的?怎样才能并行?

原文网址&#xff1a;SpringBoot--为什么Controller是串行的&#xff1f;怎样才能并行&#xff1f;-CSDN博客 简介 本文介绍SpringBoot为什么Controller是串行的&#xff1f;在什么场景下才能并行执行&#xff1f; 大家都知道&#xff0c;SpringBoot的Controller按理是并行执…

1.2.1 HuggingFists安装说明-Linux安装

Linux版安装说明 下载地址 【GitHub】https://github.com/Datayoo/HuggingFists 【百度网盘】https://pan.baidu.com/s/12-qzxARjzRjYFvF8ddUJQQ?pwd2024 安装说明 环境要求 操作系统&#xff1a;CentOS7 硬件环境&#xff1a;至少4核8G&#xff0c;系统使用Containerd…

IIS HTTPS 网页可能暂时无法连接,或者它已永久性地移动到了新网址 ERR_HTTP2_INADEQUATE_TRANSPORT_SECURITY

问题描述&#xff1a;站点突然无法访问&#xff0c;经排查发现&#xff0c;HTTP协议的网址可以继续访问&#xff0c;HTTPS的网址不可以访问。 问题分析&#xff1a;在Windows更新和滚动之后&#xff0c;由于 HTTP/2&#xff0c;当站点启动了 HTTP/2 连接&#xff0c;会出现一个…

通过PHP获取商品详情

在电子商务的浪潮中&#xff0c;数据的重要性不言而喻。商品详情信息对于电商运营者来说尤为宝贵。PHP&#xff0c;作为一种广泛应用的服务器端脚本语言&#xff0c;为我们提供了获取商品详情的便捷途径。 了解API接口文档 开放平台提供了详细的API接口文档。你需要熟悉商品详…

【JavaEE初阶】网络原理

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#xff0c;欢迎指出~ 目录 ⽹络互连 IP地址 端口号 协议 协议分层 优势 TCP/IP 五层网络模型 数据在网络通信中的整体流程 封装和分用 封装 分用 ⽹络互连 随着时代的发展&#xff0c;越来越需…

若依框架使用教程

1、若依介绍 1.1什么是低代码开发平台&#xff1f; 低代码诞生的目的是将可<font style"color:rgb(51, 51, 51);background-color:rgb(248, 248, 248);">重复性的编程</font>工作通过<font style"color:rgb(51, 51, 51);background-color:rgb(2…

.Net 6.0 监听Windows网络状态切换

上次发了一个文章获取windows网络状态&#xff0c;判断是否可以访问互联网。传送门&#xff1a;获取本机网络状态 这次我们监听网络状态切换&#xff0c;具体代码如下&#xff1a; public class WindowsNetworkHelper {private static Action<bool>? _NetworkStatusCh…

js列表数据时间排序和取唯一值

1.取唯一值[...new Set(array)] const array [1, 2, 3, 2, 4, 5, 3, 5]; // 使用Set去除重复元素 const uniarray [...new Set(array)]; console.log(uniarray); // 输出: [1, 2, 3, 4, 5] 2.排序 var u [1,3,2,5,4]; var uu u.sort(); console.log(uu); var u [1,3…

求组合数专题

求组合数 Ⅰ&#xff08;递推公式&#xff09; 思路 递推法预处理 利用公式 复杂度 直接查询 单次查询复杂度 代码 #include <bits/stdc.h> using namespace std; const int N 2010; const int mod 1e97; int c[N][N]; int get_c(int a, int b) {c[0][0] 1;for(i…

9700万个新岗位涌现,AI失业焦虑背后:超千万人找到了新工作

随着技术的飞速进步&#xff0c;全球就业市场正经历着翻天覆地的变化。 《2023年未来就业报告》预计&#xff0c;未来五年将新增近7000万个工作岗位&#xff0c;同时淘汰8300万个旧岗位。 在中国&#xff0c;城市如武汉和东莞正成为新就业机会的热土&#xff0c;无人驾驶、人…

C++ | Leetcode C++题解之第433题最小基因变化

题目&#xff1a; 题解&#xff1a; class Solution { public:int minMutation(string start, string end, vector<string>& bank) {int m start.size();int n bank.size();vector<vector<int>> adj(n);int endIndex -1;for (int i 0; i < n; i)…

数据结构2——单链表

在数据结构1——顺序表&#xff08;C语言版&#xff09;中&#xff0c;我们已经了解了顺序表的使用和实现&#xff0c;总结一下顺序表的优点&#xff1a; ①尾插尾删效率足够快&#xff1b; ②下标的随机访问和修改也足够方便。 可除此之外顺序表也确实存在着不足&#xff1a; …

随手记:牛回速归

上周-国庆前&#xff1a;牛回速归 国庆&#xff1a;小心被套住 国庆后&#xff1a;一片迷茫 总结&#xff1a;要是上周到国庆前的基本都能捞到&#xff0c;后面情况不好说 后续持续更新