SpringBoot集成MQTT实现交互服务通信

引言

本文是springboot集成mqtt的一个实战案例。
gitee代码库地址:源码地址

一、什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:

网络受限:网络带宽较低且传输不可靠
终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的

MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。

二、发布/订阅模式

发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。

主要组成部分

  1. 发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。

  2. 订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。

  3. 消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。

优点

  • 解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。

  • 灵活性:可以动态添加或删除订阅者,不影响其他组件。

  • 可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。

缺点

  • 复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。

  • 性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。

应用场景

  • 事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。

  • 数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。

  • 分布式系统:用于跨系统或跨服务的消息传递。

发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。

三、Windows下安装MQTT消息服务器

非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。

  • 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
  • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public

如果报错缺少Erlang环境,需要自行安装下该环境

在这里插入图片描述
在这里插入图片描述

浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过

在这里插入图片描述

四、Windows安装MQTT消息代理客户端MQTTX

下载地址:MQTTX下载地址

点击免费下载
在这里插入图片描述

选择64位版本

在这里插入图片描述
下好后点击安装,启动运行界面如下:
在这里插入图片描述
语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。

五、新建MQTT集成项目

随便新建了一个springboot应用,用的是JDK17,在pom文件中引入如下依赖:

        <!-- MQTT -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

5.1 yml配置

server:
  port: 8081

#允许循环依赖
spring:
  main:
    allow-circular-references: true

customer:
  mqtt:
    broker: tcp://localhost:1883
    clientList:
      #发布客户端ID
      - clientId: nays_service
        #监听主题 同时订阅多个主题 使用 - 分割开
        subscribeTopic: mqtt/publish
        #用户名
        userName: admin
        #密码
        password: public
      #接收客户端ID
      - clientId: receive_service
        #监听主题 同时订阅多个主题 使用 - 分割开
        subscribeTopic: mqtt/receive
        #用户名
        userName: admin
        #密码
        password: public



5.2 Mqtt配置类

package com.hulei.mqttproject.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * Mqtt配置类
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要创建的MQTT客户端
     */
    List<MqttClient> clientList;
}

5.3 MQTT客户端

package com.hulei.mqttproject.config;

import lombok.Data;


/**
 * MQTT客户端
 */
@Data
public class MqttClient {
    /**
     * 客户端ID
     */
    private String clientId;
    /**
     * 监听主题
     */
    private String subscribeTopic;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
}

5.4 MQTT客户端管理类

package com.hulei.mqttproject.config;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 */
@Slf4j
@Component
public class MqttClientManager {

    @Value("${customer.mqtt.broker}")
    private String mqttBroker;

    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存储MQTT客户端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 创建mqtt客户端
     *
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     * @param userName       用户名,可为空
     * @param password       密码,可为空
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !userName.isEmpty()) {
                connOpts.setUserName(userName);
            }

            if (null != password && !password.isEmpty()) {
                connOpts.setPassword(password.toCharArray());
            }

            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }

                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }

            //连接mqtt服务端broker
            client.connect(connOpts);
            // 订阅主题
            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                if (subscribeTopic.contains("-"))
                    client.subscribe(subscribeTopic.split("-"));
                else {
                    client.subscribe(subscribeTopic);
                }
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}

5.5 MQTT客户端创建

package com.hulei.mqttproject.config;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * MQTT客户端创建
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager mqttClientManager;
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 创建MQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {
        List<MqttClient> mqttClientList = mqttConfig.getClientList();

        for (MqttClient mqttClient : mqttClientList) {
            log.info("{}", mqttClient);
            //创建客户端,客户端ID:demo,回调类跟客户端ID一致
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}

5.6 MQTT回调抽象类

package com.hulei.mqttproject.config;

import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * MQTT回调抽象类
 */
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {

    private String clientId;

    private MqttConnectOptions connectOptions;

    @Resource
    MqttClientManager mqttClientManager;

    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != connectOptions) {
                    mqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                } else {
                    mqttClientManager.getMqttClientById(clientId).connect();
                }
            }

        } catch (Exception e) {
            log.error("{} reconnect failed!", e.getMessage(), e);
        }
    }

    /**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload());
     	handleReceiveMessage(topic, content);
    }

    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }


    /**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}


5.7 MQTT订阅回调环境类

package com.hulei.mqttproject.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * MQTT订阅回调环境类
 */
@Component
@Slf4j
public class MqttCallBackContext {

    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();

    /**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.putAll(callBackMap);
    }

    /**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}

5.8 默认回调类

package com.hulei.mqttproject.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 默认回调
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    /**
     * @param topic   主题
     * @param message 消息内容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("接收到主题---{}", topic);
        log.info("接收到消息---{}", message);
        // 自定义消息处理业务

    }
}

六、测试服务类

package com.hulei.mqttproject.controller;

import com.hulei.mqttproject.config.MqttClientManager;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@Slf4j
public class SendController {

    @Resource
    private MqttClientManager mqttClientManager;

    @RequestMapping("/sendMessage")
    public String sendMessage(String topic){
        try {
            MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
            mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);
            return "发送成功";
        } catch (Exception e) {
            log.error("发送失败",e);
            return "发送失败";
        }
    }
}

七、启动springboot

启动日志可以看到,mqtt消息服务器连接成功

在这里插入图片描述
EMQX工具显示发布客户端和接收客户端均已成功注册

在这里插入图片描述

使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/843395.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[Armbian] 部署Docker版Home Assistent,安装HACS并连接米家设备

title: [Armbian] 部署Docker版Home Assistent&#xff0c;安装HACS并连接米家设备 date: 2024-07-21T10:51:23Z lastmod: 2024-07-21T11:40:39Z [Armbian] 部署Docker版Home Assistent&#xff0c;安装HACS并连接米家设备 官网&#xff1a;Home Assistant (home-assistant.i…

sql常见50道查询练习题

sql常见50道查询练习题 1. 表创建1.1 表创建1.2 数据插入 2. 简单查询例题(3题&#xff09;2.1 查询"李"姓老师的数量2.2 查询男生、女生人数2.3 查询名字中含有"风"字的学生信息 3. 日期相关例题(6题&#xff09;3.1 查询各学生的年龄3.2 查询本周过生日的…

Yolo-World网络模型结构及原理分析(一)——YOLO检测器

文章目录 概要一、整体架构分析二、详细结构分析YOLO检测器1. Backbone2. Head3.各模块的过程和作用Conv卷积模块C2F模块BottleNeck模块SPPF模块Upsampling模块Concat模块 概要 尽管YOLO&#xff08;You Only Look Once&#xff09;系列的对象检测器在效率和实用性方面表现出色…

【GraphRAG】微软 graphrag 效果实测

GraphRAG 本文将基于以下来源&#xff0c;对Microsoft GraphRAG分析优缺点、以及示例实测分析。 1. Source 代码仓库&#xff1a; Welcome to GraphRAGhttps://microsoft.github.io/graphrag/ 微软文章1&#xff08;2024.2.13&#xff09;&#xff1a;GraphRAG: Unlocking…

通过albumentation对目标检测进行数据增强(简单直接)

albumentation官方文档看不懂&#xff1f;xml文件不知道如何操作&#xff1f;下面只需要修改部分代码即可上手使用 要使用这个方法之前需要按照albumentation这个库还有一些辅助库,自己看着来安装就行 pip install albumentation pip install opencv-python pip install json…

阿尔泰科技利用485模块搭建自动灌溉系统实现远程控制

自动灌溉系统又叫土壤墒情监控系统&#xff0c;土壤墒情监控系统主要实现固定站无人值守情况下的土壤墒情数据的自动采集和无线传输&#xff0c;数据在监控中心自动接收入库&#xff1b;可以实现24小时连续在线监控并将监控数据通过有线、无线等传输方式实时传输到监控中心生成…

破解反爬虫策略 /_guard/auto.js(二)实战

这次我们用上篇文章讲到的方法来真正破解一下反爬虫策略&#xff0c;这两个案例是两个不同的网站&#xff0c;一个用的是 /_guard/auto.js&#xff0c;另一个用的是/_guard/delay_jump.js。经过解析发现这两个网站用的反爬虫策略基本是一模一样&#xff0c;只不过在js混淆和生成…

FOG Project 文件名命令注入漏洞复现(CVE-2024-39914)

0x01 产品简介 FOG是一个开源的计算机镜像解决方案,旨在帮助管理员轻松地部署、维护和克隆大量计算机。FOG Project 提供了一套功能强大的工具,使用户能够快速部署操作系统、软件和配置设置到多台计算机上,从而节省时间和精力。该项目支持基于网络的 PXE 启动、镜像创建和还…

Python | Leetcode Python题解之第240题搜索二维矩阵II

题目&#xff1a; 题解&#xff1a; class Solution:def searchMatrix(self, matrix: List[List[int]], target: int) -> bool:m, n len(matrix), len(matrix[0])x, y 0, n - 1while x < m and y > 0:if matrix[x][y] target:return Trueif matrix[x][y] > tar…

使用崖山YMP 迁移 Oracle/MySQL 至YashanDB 23.2 验证测试

前言 首届YashanDB「迁移体验官」开放后&#xff0c;陆续收到「体验官」们的投稿&#xff0c;小崖在此把优秀的投稿文章分享给大家~今天分享的用户文章是《使用崖山YMP 迁移 Oracle/MySQL 至YashanDB 23.2 验证测试》&#xff08;作者&#xff1a;尚雷&#xff09;&#xff0c…

数据结构(队列及其实现)

概念与结构 概念&#xff1a;只允许在⼀端进⾏插⼊数据操作&#xff0c;在另⼀端进⾏删除数据操作的特殊线性表&#xff0c; 队列具有先进先出FIFO(First In First Out)原则。 ⼊队列&#xff1a;进⾏插⼊操作的⼀端称为队尾 出队列&#xff1a;进⾏删除操作的⼀端称为队头…

MyBatis 持久层框架-上

一、Mybatis 简介 1. 简介 MyBatis 是一款优秀的持久层框架&#xff0c;它支持自定义SQL、存储过程以及高级映射。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis 可以通过简单的 XML 或注解来配置和映射原始类型、接口和Java POJO 为数据库中的记…

使用SpringAOP解决日志记录问题+获取MyBatis执行的SQL语句(企业中常用的日志审计功能)

前言 需求是这样的&#xff1a;每个接口都有不同的数据库操作。想要将这些请求和数据库操作放到日志当中&#xff0c;方便管理员查看有哪些操作被执行了。这里排除查询操作&#xff0c;只在日志中记录 update、insert、delete 这三个操作。期望的日志表中应该有每次执行的 sql …

基于电鸿(电力鸿蒙)的边缘计算网关,支持定制

1 产品信息 边缘计算网关基于平头哥 TH1520 芯片&#xff0c;支持 OpenHarmony 小型系统&#xff0c;是 连接物联网设备和云平台的重要枢纽&#xff0c;可应用于城市基础设施&#xff0c;智能工厂&#xff0c;智能建筑&#xff0c;营业网点&#xff0c;运营 服务中心相关场…

RK3568笔记四十一:DHT11驱动开发测试

若该文为原创文章&#xff0c;转载请注明原文出处。 记录开发单总线&#xff0c;读取DHT11温湿度 一、DHT11介绍 DHT11是串行接口&#xff08;单线双向&#xff09;DATA 用于微处理器与 DHT11之间的通讯和同步&#xff0c;采用单总线数据格式&#xff0c;一次通讯时间4ms左右…

好用的AI搜索引擎

1. 360AI 搜索 访问 360AI 搜索: https://www.huntagi.com/sites/1706642948656.html 360AI 搜索介绍&#xff1a; 360AI 搜索&#xff0c;新一代智能答案引擎&#xff0c;值得信赖的智能搜索伙伴&#xff0c;为复杂搜索提供专业支持&#xff0c;解锁更相关、更全面的答案。AI…

视频汇聚,GB28181,rtsp,rtmp,sip,webrtc,视频点播等多元异构视频融合,视频通话,视频会议交互方案

现在视频汇聚&#xff0c;视频融合和视频互动&#xff0c;是视频技术的应用方向&#xff0c;目前客户一般有很多视频的业务系统&#xff0c;如已有GB28181的监控&#xff08;GB现在是国内主流&#xff0c;大量开源接入和商用方案&#xff09;&#xff0c;rtsp设备&#xff0c;音…

建筑集团工程地产类公司网站源码系统 带完整的安装代码包以及搭建部署教程

系统概述 在数字化浪潮的推动下&#xff0c;建筑行业正经历着前所未有的变革。为了提升企业形象&#xff0c;优化客户体验&#xff0c;加强项目管理&#xff0c;建筑集团工程地产类公司急需一套高效、易用的网站源码系统。小编给大家分享一款专为建筑行业量身定制的网站源码系…

C语言 ——— 在控制台上打印动态变化的菱形

目录 代码要求 代码实现 代码要求 输入 整数line &#xff0c;菱形的上半部分的长度就为line&#xff08;动态变化的菱形&#xff09; 菱形由 "*" 号构成 代码实现 #include<stdio.h> int main() {// 上半长int line 0;scanf("%d", &line)…

MYSQL——库表操作

MYSQL——库表操作 1.1 SQL语句基础1.1.1. SQL简介1.1.2. SQL语句分类1.1.3. SQL语句的书写规范 1.2 数据库的操作1.2.1 数据库的登录及退出1.2.2 查看数据库1.2.3 创建数据库1.2.4 切换数据库1.2.5 查看当前用户1.2.6 删除数据库 1.3 MySQL字符集1.3.1. 字符集1.3.2. 字符序1.…