springboot集成mqtt

文章目录

  • 前言
  • 一、MQTT是什么?
  • 二、继承步骤
    • 1.安装MQTT
    • 2.创建项目,引入依赖
    • 3. 对应步骤2的代码
    • 3 测试
  • 总结
    • mqtt 启动后访问地址


前言

随着物联网的火热,MQTT的应用逐渐增多

曾经也有幸使用过mqtt,今天正好总结下MQTT的使用;


一、MQTT是什么?

可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。
MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。
MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。


个人简单总结:

  1. 每个客户端可以订阅一个或者多个主题(发消息,收消息)
  2. 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息)
  3. 客户端A发送消息给客户端B流程为:
客户端A>>>Broker>>>客户端B
--- 
前置条件:
a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
b: 客户端B 订阅主题

二、继承步骤

1.安装MQTT

这里直接采用windows版本,解压版,比较快

  • 下载地址 MQTT-windows版本
  • 解压后,在bin文件下执行运行命令 .\emqx console
  • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码admin/public
    MQTT管理页面

2.创建项目,引入依赖

大致分为如下步骤:

  • yml配置 主题 用户名 密码
  • 根据配置创建客户端实例,实例订阅主题
  • 实现 MqttCallback 接口
    1. 重连处理 connectionLost
    2. 消息接受处理 messageArrived
    3. 消息发生成功处理 deliveryComplete
    
  • 根据客户端信息发送某个主题的消息

3. 对应步骤2的代码

  1. yml配置
server:
  port: 8081
# 下面这里要看你自己的需求
customer:
  mqtt:
    broker: tcp://127.0.0.1:1883
    clientList:
      #发布客户端ID
      - clientId: nxys_service
        #监听主题 同时订阅多个主题使用 - 分割开
        subscribeTopic: mqtt/publish
        #用户名
        userName: admin
        #密码
        password: public
      #接受客户端ID
      - clientId: receive_service
        #监听主题 同时订阅多个主题使用 - 分割开
        subscribeTopic: mqtt/receive
        #用户名
        userName: admin
        #密码
        password: public


  1. 实例信息获取
/**
 * Mqtt配置类
 */
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要创建的MQTT客户端
     */
    List<MqttClient> clientList;
}
/**
 * MQTT客户端
 */
@Data
public class MqttClient {
    /**
     * 客户端ID
     */
    private String clientId;
    /**
     * 监听主题
     */
    private String subscribeTopic;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
}
  1. 根据信息创建实例,订阅主题
/**
 * MQTT客户端创建
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager mqttClientManager;
    @Autowired
    private MqttConfig mqttConfig;

    /**
     * 创建MQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {
        List<MqttClient> mqttClientList = mqttConfig.getClientList();

        for (MqttClient mqttClient : mqttClientList) {
            log.info("{}", mqttClient);
            //创建客户端,客户端ID:demo,回调类跟客户端ID一致
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());
        }
    }
}
/**
 * MQTT客户端管理类,如果客户端非常多后续可入redis缓存
 */
@Slf4j
@Component
public class MqttClientManager {
    @Value("${customer.mqtt.broker}")
    private String mqttBroker;
    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存储MQTT客户端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public static MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 创建mqtt客户端
     *
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     * @param userName       用户名,可为空
     * @param password       密码,可为空
     * @return mqtt客户端
     */
    public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            if (null != userName && !"".equals(userName)) {
                connOpts.setUserName(userName);
            }

            if (null != password && !"".equals(password)) {
                connOpts.setPassword(password.toCharArray());
            }

            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                if (null == callBack) {
                    callBack = mqttCallBackContext.getCallBack("default");
                }

                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);
            }

            //连接mqtt服务端broker
            client.connect(connOpts);
            // 订阅主题
            if (null != subscribeTopic && !"".equals(subscribeTopic)) {
                if (subscribeTopic.contains("-"))
                    client.subscribe(subscribeTopic.split("-"));
                else
//                    if (!subscribeTopic.equals("mqtt/receive"))
                {
                    client.subscribe(subscribeTopic);
                }
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);
        } catch (MqttException e) {
            log.error("Create mqttClient failed!", e);
        }
    }
}
  1. 实现 MqttCallback 接口
/**
 * MQTT回调抽象类
 */
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {
    private String clientId;

    private MqttConnectOptions connectOptions;
 
    public String getClientId() {
        return clientId;
    }
 
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
 
    public MqttConnectOptions getConnectOptions() {
        return connectOptions;
    }
 
    public void setConnectOptions(MqttConnectOptions connectOptions) {
        this.connectOptions = connectOptions;
    }
 
    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {
        try {
            if (null != clientId) {
                if (null != dconnectOptions) {
                    MqttClientManager.getMqttClientById(clientId).connect(connectOptions);
                } else {
                    MqttClientManager.getMqttClientById(clientId).connect();
                }
            }
 
        } catch (Exception e) {
            log.error("{} reconnect failed!", e);
        }
    }
 
    /**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload());
     	handleReceiveMessage(topic, content);
    }
 
    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }
 
 
    /**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, String message);
}

/**
 * 默认回调
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    /**
     * @param topic   主题
     * @param message 消息内容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("接收到主题---{}", topic);
        log.info("接收到消息---{}", message);
        // 你自己的消息处理业务

    }
}
/**
 * MQTT订阅回调环境类
 */
@Component
@Slf4j
public class MqttCallBackContext {
    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();

    /**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.clear();
        this.callBackMap.putAll(callBackMap);
    }

    /**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}
  1. 发送消息
@RestController
public class SendController {
    @Resource
    MqttClientManager mqttClientManager;

    @RequestMapping("/sendMessage")
    public String sendMessage(String topic){
        try {
            MqttMessage mqttMessage = new MqttMessage("你好".getBytes());
            mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage);
            return "发送成功";
        } catch (Exception e) {
            e.printStackTrace();
            return "发送失败";
        }
    }
}

3 测试

  1. 启动订阅,查看MQTT 管理页面
    两个实例
  2. 测试发送消息,查看发送情况,接受情况
    http://localhost:8081/sendMessage?topic=mqtt/receive
    发送成功,并接受到消息

总结

文中涉及的所有代码: MQTT-Demo

mqtt 启动后访问地址

http://localhost:18083/#/

  • 用户名/密码:
  • admin/public

  1. 每个客户端可以订阅一个或者多个主题
  2. 每个客户端不订阅主题,也可以发送主题消息
  3. 客户端A发送消息给客户端B流程为:
客户端A>>>Broker>>>客户端B
--- 
前置条件:
a: 客户端A 发送主题消息,且与客户端B的订阅主题一致
b: 客户端B 订阅主题

mqtt启动命令
在bin目录下,cmd 执行

.\emqx console

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404999.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

node 之 初步认识

思考&#xff1a;为什么JavaScript可以在浏览器中被执行 代执行的js代码——JavaScript解析引擎 不同的浏览器使用不同的JavaScript解析引擎 Chrome 浏览器 》 V8 Firefox浏览器 》OdinMonkey(奥丁猴&#xff09; Safri浏览器 》JSCore IE浏览器 》Chakra(查克拉&#xff09; e…

[VulnHub靶机渗透] HA: Narak

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

ubuntu20.04 安装 matlab R2023b

ubuntu20.04 使用matlab R2023b 起因步骤问题问题1问题2问题3 起因 闲着没事&#xff0c;想在ubuntu上安装matlab。 步骤 这个博客写得很好&#xff0c;我就不赘述了&#xff1a;参考博客 。但有点不一样&#xff1a;我现在matlab官网上下载的linux版本不是iso镜像文件&…

计算机设计大赛 深度学习二维码识别

文章目录 0 前言2 二维码基础概念2.1 二维码介绍2.2 QRCode2.3 QRCode 特点 3 机器视觉二维码识别技术3.1 二维码的识别流程3.2 二维码定位3.3 常用的扫描方法 4 深度学习二维码识别4.1 部分关键代码 5 测试结果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天…

第3.5章:StarRocks数据导入——Broker Load

注&#xff1a;本篇文章阐述的是StarRocks-3.2版本的Broker Load导入机制 一、概述 Broker Load导入方式支持从HDFS类的外部存储系统&#xff08;例如&#xff1a;HDFS、阿里OSS、腾讯COS、华为云OBS等&#xff09;&#xff0c;支持Parquet、ORC、CSV、及 JSON 四种文件格式&a…

一个非常强大的可视化.NET开源任务调度框架

在项目开发中&#xff0c;任务调度的场景非常多&#xff0c;比如每天定时发送邮件、延迟1小时处理、长时间任务&#xff08;数据导入、图像处理或文件转换&#xff09;等一些业务场景&#xff0c;我们日常做法可能会编写一个后台服务项目解决这些场景。 今天给大家推荐一个开源…

Spring6学习技术|Junit

学习材料 尚硅谷Spring零基础入门到进阶&#xff0c;一套搞定spring6全套视频教程&#xff08;源码级讲解&#xff09; Junit 背景 背景就是每次Test都要重复创建容器&#xff0c;获取对象。就是ApplicationContext和getBean两个语句。通过Spring整合Junit&#xff0c;可以…

Linux(ACT)权限管理

文章目录 一、 ATC简介二、 案例1. 添加测试目录、用户、组&#xff0c;并将用户添加到组2. 修改目录的所有者和所属组3. 设定权限4. 为临时用户分配权限5. 验证acl权限 6. 控制组的acl权限 一、 ATC简介 ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xf…

【JavaScript 语法】

JavaScript 语法 ■ JavaScript 是什么■ JavaScript 语法■ JS 注释■ JS 结束符■ JS 输入输出语句■ JS 代码块■ JS var和let 作用域■ JS var和let 全局变量■ JS const 常量/对象/数组■ JS 关键词■ JS 值■ JS 字面量 &#xff08;混合值&#xff09;■ JS 变量&#x…

C语言——实用调试技巧——第1篇——(第22篇)

坚持就是胜利 文章目录 一、什么是bug?二、调试是什么&#xff1f;有多重要&#xff1f;三、debug 和 release 的介绍&#xff1f;1、2、3、 四、windows环境调试介绍1、调试环境的准备2、学会快捷键F5 或者 Fn F5条件断点 Ctrl F5F9 或者 Fn F9F10 或者 Fn F10F11 或者 F…

文件操作IO

文件操作IO .认识文件树型结构组织 和 目录文件路径&#xff08;Path&#xff09;其他知识Java 中操作文件构造方法方法 创建文件删除文件创建目录重命名文件内容的读写 —— 数据流Reader/Writer(字符流)InputStream/OutputStreadm(字节流)scanner 例题1.扫描指定目录&#xf…

SpringMVC 学习(一)之 SpringMVC 介绍

目录 1 MVC 介绍 2 SpringMVC 介绍 2.1 SpringMVC 特点 2.2 SpringMVC 的核心组件 2.3 SpringMVC 执行流程 3 参考文档 1 MVC 介绍 MVC (Model View Controller) 是一种设计思想&#xff0c;它将应用程序分为三大组件&#xff1a;模型 (Model)、视图 (View)、控制器 (Con…

素数筛法详解:埃氏筛和欧拉筛

主要讲解怎么判断一个数字是否是素数&#xff1a; 埃式筛 学习埃氏筛之前&#xff0c;我们先看一下暴力筛法&#xff0c;即对每个数都用试除法判断其是不是质数&#xff1a; 暴力筛法&#xff1a; # include <stdio.h>int main() {int st[N]; // 初始化为0&#xff0…

rider 缺少iisexpress

File C:/Program Files (x86)/IIS Express/iisexpress.exe doesn’t exist iisexpress下载 64位系统只能安装64位&#xff0c;32位系统安装32位 安装完成之后就有了

MacBook的nginx出现13: Permission denied 的问题分析和解决办法

同样的项目代码&#xff0c;电脑从Windows更换到了MacBook&#xff0c;发现网站的样式都没有了&#xff0c;直接访问CSS文件 http://crm.ms-test.cc/toolstatic/css/bootstrap.min.css 发现无法访问。查看Nginx错误日志&#xff1a; 说明是nginx没有权限访问这个CSS文件&#…

NDK的log.h使用__android_log_print报错app:buildCMakeDebug[x86_64]

org.gradle.api.tasks.TaskExecutionException: Execution failed for task :app:buildCMakeDebug[x86_64] 重点是 Execution failed for task :app:buildCMakeDebug[x86_64]. 我的代码&#xff1a; #include <android/log.h> #define LOG_TAG "MyJNI" #d…

20240113----重返学习-`nginx/conf/nginx.conf`的https证书配置说明

20240113----重返学习-nginx/conf/nginx.conf的https证书配置说明 文件说明 不同域名的多虚拟主机配置 server {listen 443 ssl;#在443端口上监听SSL/TLS流量;server_name localhost;#指定服务器名称&#xff0c;应该与域名匹配;ssl_certificate fangchaoduan.com.pem;#指定SS…

Linux之ACL权限管理

文章目录 1.ACL权限介绍二、操作步骤1. 添加测试目录、用户、组&#xff0c;并将用户添加到组2. 修改目录的所有者和所属组3. 设定权限4. 为临时用户分配权限5. 验证acl权限6. 控制组的acl权限 1.ACL权限介绍 每个项目成员有一个自己的项目目录&#xff0c;对自己的目录有完全…

DSL Query基本语法

DSL Query基本语法 查询的基本语法如下&#xff1a; GET /indexName/_search {"query":{"查询类型":{"查询条件":"条件值"}} }查询所有 GET /indexName/_search {"query":{"match_all":{}} }match查询&#xf…

【高德地图】Android高德地图绘制标记点Marker

&#x1f4d6;第4章 Android高德地图绘制标记点Marker ✅绘制默认 Marker✅绘制多个Marker✅绘制自定义 Marker✅Marker点击事件✅Marker动画效果✅Marker拖拽事件✅绘制默认 Infowindow&#x1f6a9;隐藏InfoWindow 弹框 ✅绘制自定义 InfoWindow&#x1f6a9;实现 InfoWindow…