springBoot集成emqx 实现mqtt消息的发送订阅

介绍

我们可以想象这么一个场景,我们java应用想要采集到电表a的每小时的用电信息,我们怎么拿到电表的数据?一般我们会想 直接 java 后台发送请求给电表,然后让电表返回数据就可以了,事实上,我们java应用发送请求请求电表的数据信息并不是发到电表上,而是发送到 服务端 (broker)上,请求服务器 给我们电表的信息,而电表会把数据 按照mqtt协议 源源不断的发送到服务端,服务端可以把数据存储到物联网数据库上,也可以由我们java应用手动存储到物联网数据库上

而我们怎么知道电表发送到服务端的哪里,java应用又怎么请求到该电表发送的位置?

这就 引出了 一个概念,主题 (topic) ,这个topic在mqtt中不需要手动的创建,只要又客户端订阅或者发布消息,主题就会被自动创建出来

而我们服务端用的最多的就是 集成好的emqx服务器,本文我们也用的是集成好的emqx的服务端

,我们先是 一个电表 订阅好一个固定的主题,然后 源源不断的往服务端发消息,然后我们java应用订阅这个主题,这样 java应用就能持续的拿到电表的数据了

具体什么是主题,主题怎么设置的 ,mqtt协议的具体协议内容,直接登录emqx官网查看即可

MQTT 最全教程:从入门到精通 | EMQ

而emqx服务器是怎么在linux系统上搭建的呢,具体直接看文档即可,输入文档对应的yum命令就可以直接 在linux服务器上安装了

在 CentOS/RHEL 上安装 EMQX | EMQX文档

文本主要书写代码的实现

代码实现

我们的yml文件如下

 我们后续 java应用订阅消息 都要到服务端 emqx 的1883端口

实体类如下

 

这里解释一下 clientid 是不固定的,随机的每一个发布/订阅消息的客户端都有一个唯一的clientid

而username 和password 是 客户端连接到 服务端的认证账户,多个客户端可以使用一个 账号密码

客户端代码实现

@Slf4j
@Component
@RequiredArgsConstructor
public class EMQXClient {
private  final MqttDefaultProperties mqttDefaultProperties;
private  final IMessageCallbackImpl mqttCallback;
    private IMqttClient mqttClient;

    /**
     * 初始化客户端对象
     */
public  boolean initMqttClient(String clientId,String serverUrl)  {
    MemoryPersistence memoryPersistence = new MemoryPersistence();
    try {
        if(Objects.isNull(clientId)){
           clientId= mqttDefaultProperties.getDefaultClientId();
        }
        if(Objects.isNull(serverUrl)){
            serverUrl= mqttDefaultProperties.getServerUrl();
        }
       mqttClient = new MqttClient(serverUrl, clientId, memoryPersistence);
    } catch (MqttException e) {
        log.info("mqtt创建异常:{}", e.getMessage());
        return  false;
    }
    return true;
}
    public  boolean initMqttClient()  {
        MemoryPersistence memoryPersistence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttDefaultProperties.getServerUrl(),mqttDefaultProperties.getDefaultClientId(),  memoryPersistence);
        } catch (MqttException e) {
            log.info("mqtt创建异常:{}", e.getMessage());
            return  false;
        }
        return true;
    }

    /**
     * 获取连接
     * @return
     */
    public   boolean connect()  {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        //当客户端会话关闭的时候  对应的broker也关闭
        mqttConnectOptions.setCleanSession(true);
        //自动重连
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(mqttDefaultProperties.getDefaultUserName());
        mqttConnectOptions.setPassword(mqttDefaultProperties.getDefaultPassword().toCharArray());
        mqttClient.setCallback(mqttCallback);
        try {
            mqttClient.connect(mqttConnectOptions);
        } catch (MqttException e) {
            log.info("客户端连接异常:{}",e.getMessage());
            return  false;
        }
        return true;
    }
    public   boolean connect(String username,String password)  {
    if(Objects.isNull(username)){
        username=mqttDefaultProperties.getDefaultUserName();
    }
    if(Objects.isNull(password)){
        password= mqttDefaultProperties.getDefaultPassword();
    }
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    //当客户端会话关闭的时候  对应的broker也关闭
    mqttConnectOptions.setCleanSession(true);
    //自动重连
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setUserName(username);
    mqttConnectOptions.setPassword(password.toCharArray());
    mqttClient.setCallback(mqttCallback);
    try {
        mqttClient.connect(mqttConnectOptions);
    } catch (MqttException e) {
        log.info("客户端连接异常:{}",e.getMessage());
     return  false;
    }
    return true;
}

    /**
     * 断开连接
     * @return
     */
    public  boolean disConnect(){
    try {
        mqttClient.disconnect();
    } catch (MqttException e) {
        log.info("客户端断开连接异常:{}",e.getMessage());
        return  false;
    }
return true;
}

    /**
     *
     * @param topic 主题
     * @param msg 消息内容
     * @param qosEnum
     * @param retain  新的订阅者来了是否能拿到之前的 最新的一次消息
     * @return
     */
    public  boolean publish(String topic, String msg, QosEnum qosEnum,boolean retain){
        int uniqueInt = (int) (System.nanoTime() & 0xFFFFFFFFL);//取纳秒时间戳低32位
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qosEnum.getType());
        mqttMessage.setRetained(retain);
        mqttMessage.setId(uniqueInt);
        try {
            mqttClient.publish(topic,mqttMessage);
        } catch (MqttException e) {
            log.info("客户端发送消息失败:{}",e.getMessage());
            return  false;
        }
        return  true;
    }

    /**
     *
     * @param topicFilters 要订阅的主题 例子 testtopic/#
     * @param qosEnum
     * @return
     */
    public boolean subscribe(String topicFilters,QosEnum qosEnum){
        try {
            mqttClient.subscribe(topicFilters,qosEnum.getType());
        } catch (MqttException e) {
            log.info("订阅主题失败:{}",e.getMessage());
            return  false;
        }
        return true;
    }
public boolean unSubscribe(String topicFilter){
    try {
        mqttClient.unsubscribe( topicFilter);
    } catch (MqttException e) {
        log.info("取消订阅主题失败:{}",e.getMessage());
        return false;
    }
    return  true;
}

}

我们着重关注的是

我们想连接 服务端 是不是得有 一个client ,那这个client就对应IMqttclient

,我们java应用客户端连接上服务端之后,是不是得订阅主题,订阅之后的逻辑在哪里,就在

IMessageCallbackImpl

这里面就是 书写的 客户端收到服务端发来的消息之后的处理情况

@Slf4j
@Component
public class IMessageCallbackImpl  implements MessageCallback {

    @Override
    public void connectionLost(Throwable cause) {
        //丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连 或者记录 日志之类的
        log.info("丢失了对broker的连接");
    }

    /**
     * 订阅到消息后的回调
     * 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
     * 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的
     消息都将由broker服务器再次发送到客户端
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("订阅到了消息;topic={},messageid={},qos={},msg={}",
                topic,
                message.getId(),
                message.getQos(),
                new String(message.getPayload()));

    }
    /**
     * 消息发布完成且收到ack确认后的回调
     * QoS0:消息被服务端发出后触发一次
     * QoS1:当收到broker的PUBACK消息后触发
     * QoS2:当收到broer的PUBCOMP消息后触发
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息发送完成,messageId={},topics={}",messageId,topics);
    }
}

 

我们用一个 bean 在初始化的时候就订阅一个主题,这样 只要有 客户端往主题上发消息,我们就能收到了

而我们这个时候 没有硬件,怎么办呢,很简单,直接下载一个mqttx 模拟硬件发送消息到主题,启动springboot,就能看到消息的发送与接收了

当然 这实现的紧紧是最简单的协议的发送接收,后面还有许多的高级功能等我们使用,具体的可以查阅官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/981449.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装milvus及其基本使用说明

简介 Milvus 是一款开源的高性能、高可用的向量数据库,专为大规模机器学习和深度学习应用设计,旨在高效管理和检索高维向量数据。随着AI技术的飞速发展,向量数据库在图像识别、语音识别、自然语言处理、推荐系统等领域扮演着越来越重要的角色…

ssm_mysql_小型企业人事管理系统

收藏关注不迷路!! 🌟文末获取源码数据库🌟 感兴趣的可以先收藏起来,还有大家在毕设选题(免费咨询指导选题),项目以及论文编写等相关问题都可以给我留言咨询,希望帮助更多…

25年第四本【认知觉醒】

《认知觉醒》:一场与大脑的深度谈判 在信息爆炸的焦虑时代,我们像被抛入湍流的溺水者,拼命抓取各种自我提升的浮木,却在知识的漩涡中越陷越深。这不是一本简单的成功学指南,而是一场关于人类认知系统的深度对话&#…

盘古信息携手艾罗能源启动IMS数字化智能制造工厂项目,共筑新能源行业数字化标杆

在政策扶持下成长的新能源行业,如今已逐步进入商业化阶段。相比传统制造行业,新能源行业离散度高、自动化程度高。 面对迅疾的市场变化,在大环境中一枝独秀的新能源行业,亟需突破传统架构的限制,通过构建智能化生产体…

32.C++二叉树进阶1(二叉搜索树)

⭐上篇文章:31.C多态4(静态多态,动态多态,虚函数表的存储位置)-CSDN博客 ⭐本篇代码:c学习/18.二叉树进阶-二叉搜索树 橘子真甜/c-learning-of-yzc - 码云 - 开源中国 (gitee.com) ⭐标⭐是比较重要的部分…

图论基础算法: 二分图的判定(C++)

二分图的基本概念 什么是二分图? 二分图(Bipartite Graph)是指一个图的顶点集可以被分割为两个互不相交的子集 U U U 和 V V V, 并且图中的每一条边都连接 U U U 中的一个顶点和 V V V 中的一个顶点. 换句话说, 二分图中的顶点可以被分成两组, 组内的顶点之间没有边相连…

pyside6学习专栏(九):在PySide6中使用PySide6.QtCharts绘制6种不同的图表的示例代码

PySide6的QtCharts类支持绘制各种型状的图表,如面积区域图、饼状图、折线图、直方图、线条曲线图、离散点图等,下面的代码是采用示例数据绘制这6种图表的示例代码,并可实现动画显示效果,实际使用时参照代码中示例数据的格式将实际数据替换即可…

爬虫逆向实战小记——解决webpack实记

注意!!!!某XX网站实例仅作为学习案例,禁止其他个人以及团体做谋利用途!!! aHR0cHM6Ly9wbW9zLnhqLnNnY2MuY29tLmNuOjIwMDgwL3B4Zi1zZXR0bGVtZW50LW91dG5ldHB1Yi8jL3B4Zi1zZXR0bGVtZW5…

Hive-优化(语法优化篇)

列裁剪与分区裁剪 在生产环境中,会面临列很多或者数据量很大时,如果使用select * 或者不指定分区进行全列或者全表扫描时效率很低。Hive在读取数据时,可以只读取查询中所需要的列,忽视其他的列,这样做可以节省读取开销…

【三维生成】StarGen:基于视频扩散模型的可扩展的时空自回归场景生成

标题:《StarGen: A Spatiotemporal Autoregression Framework with Video Diffusion Model for Scalable and Controllable Scene Generation》 项目:https://zju3dv.github.io/StarGen 来源:商汤科技、浙大CAD、Tetras.AI 文章目录 摘要一、…

vue2(笔记)4.0vueRouter.声明式/编程式导航以及跳转传参.重定向

---vueRouter 五个步骤: 两个核心: {path:路径,component:组件} 二级路由: 1.在主页路由对象中,添加children配置项 2.准备路由出口 示例代码: {path: /,component: layout,redirect: home,children: [{path: /home,component: home},{path: /card,component: card}]}, 在l…

内网渗透信息收集linuxkali扫描ip段,收集脚本(web安全)

内网ip段扫描↓ 工具1↓ nmap -sn 192.168.128.0/24工具2↓ nbtscan 192.168.128.0/24 工具↓3 arp-scan -t 1000 192.168.128.0/24 cmd命令扫描↓ for /L %I in (1,1,255) Do ping -w 1 -n 1 192.168.128.%I | findstr "TTL" 这个命令在Windows命令提示符下使…

DeepSeek崛起:如何在云端快速部署你的专属AI助手

在2025年春节的科技盛宴上,DeepSeek因其在AI领域的卓越表现成为焦点,其开源的推理模型DeepSeek-R1擅长处理多种复杂任务,支持多语言处理,并通过搜索引擎获取实时信息。DeepSeek因其先进的自然语言处理技术、广泛的知识库和高性价比…

python-leetcode 48.二叉树的最近公共祖先

题目: 给定一个二叉树,找到该树中两个指定节点的最近公共祖先 百度百科中最近公共祖先的定义为:“对于有根树 T 的两个节点 p、q,最近公共祖先表示为一个节点 x,满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff0…

示例:在WPF中如何使用Segoe MDL2 Assets图标和使用该图标的好处

一、目的:分享在WPF中如何使用Segoe MDL2 Assets图标和使用该图标的好处 在WPF中使用Segoe MDL2 Assets字体,可以通过设置控件的FontFamily属性来实现。Segoe MDL2 Assets是一个包含许多图标的字体,通常用于Windows应用程序的图标显示。 二、…

QT——基于 QListWidget 和 QStackedWidget 的页面切换

Qt 练习题:基于 QListWidget 和 QStackedWidget 的页面切换 Qt 练习题:基于 QListWidget 和 QStackedWidget 的页面切换 题目描述: 请使用 Qt 设计一个窗口,其中包含一个 QListWidget 和一个 QStackedWidget。要求实现以下功能&a…

Docker概念与架构

文章目录 概念docker与虚拟机的差异docker的作用docker容器虚拟化 与 传统虚拟机比较 Docker 架构 概念 Docker 是一个开源的应用容器引擎。诞生于 2013 年初,基于 Go 语言实现。Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xf…

linux server docker 拉取镜像速度太慢或者超时的问题处理记录

已经按网上的帖子将镜像地址改为国内的了,用docker info命令查看,如下图所示: 但是还存在下载镜像特别卡的问题,而不是直接报错了,如下图所示: 甚至已经连续下载一晚上了,还是卡在这里,不见任何下载进展。 我在window的docker中下载了对应的镜像,并用以下语句生成了…

四十二:VSCODE打开新文件覆盖上一个文件窗口问题

VSCODE打开新文件覆盖上一个文件窗口问题_vscode enablepreview-CSDN博客

shell文本处理

shell文本处理 一、grep ​ 过滤来自一个文件或标准输入匹配模式内容。除了 grep 外,还有 egrep、fgrep。egrep 是 grep 的扩展,相当于 grep -E。fgrep 相当于 grep -f,用的比较少。 用法 grep [OPTION]... PATTERN [FILE]...支持的正则描述…