SpringBoot集成Mqtt发送消息

1. MQTT简介

MQTT是一种物联网消息协议,为Message Queuing Telemetry Transport的缩写,即消息队列传输探测,协议基于发布订阅模式进行通信,有开销低、带宽小、轻量的特点,通常应用在物联网数据采集、移动应用、智能硬件、电力、能源等领域。

相关概念

三种身份:

在这里插入图片描述

  • 客户端(Client):MQTT 客户端是发送接收消息的应用程序。
  • 服务器(Broker):也叫“代理”,服务器是处理消息的应用程序,位于发布者和订阅者中间,负责接收消息,并按照某种规则发送给订阅者。
  • 主题(Topic): 主题是消息的标识符,用于区分不同类型的消息。

MQTT 消息

MQTT传输的消息可以分为:主题(topic)和负载(payload)两部分

  • 主题,可以理解为消息的类型
  • 负载,可以理解为消息的内容

消息服务质量QoS(Quality of Service)

Qos用于保证在不同的网络环境下消息传递的可靠性,分为3个等级

  • 0 消息最多传递一次,消息发布完全依赖底层TCP/IP网络,可能会发生消息丢失, 也就是发出去就不管了,也被叫做“即发即弃”
  • 1 消息传递至少 1 次,确保消息到达,但消息重复可能会发生,发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
  • 2 消息仅传送一次,确保消息到达一次

2. SpringBoot集成Mqtt

Spring集成Mqtt常用的有两种方式,一种是直接使用Mqtt的客户端库,如Eclipse Paho,另外一种是spring integration mqtt
第一种:使用Mqtt客户端库
依赖引入:org.eclipse.paho.client.mqttv3

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.0</version>
</dependency>

服务端配置

public class MqttSendMsgService {
    private static String clientId = "test";
    private static String username = "admin";
    private static String password = "xxxxxx";
    private static String broker = "tcp://xxxxx:1883";
    public ReturnT<String> mqttSend(String param) {
        MqttClient client;
        try {
            client = new MqttClient(broker, clientId, new MemoryPersistence());
            client.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }
                @Override
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("Message arrived: " + mqttMessage.getPayload());
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("Delivery complete");
                }
            });

            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());

            client.connect(connOpts);
            log.info("Connected to MQTT Broker!");


            //主题
            String topic="test/simple";
            //消息
            String content="发送测试";

            MqttMessage message = new MqttMessage();
            message.setQos(1);
            message.setRetained(false);
            message.setPayload(content.getBytes());
            //消息发送
            client.publish(topic,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return ReturnT.SUCCESS;
    }
}

上面这种使用起来比较简单,生产环境使用最多的还是下面这种

第二种:使用 Spring integration进行集成,这里以发送消息为例
依赖引入

<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-mqtt</artifactId>
	<version>5.5.14</version>
</dependency>

添加yaml配置

mqtt.url = tcp://xxxxx:1883
mqtt.username = admin
mqtt.password = 123456
mqtt.clientId = test
mqtt.defaultTopic = /test/send
mqtt.keepAliveInterval = 60
mqtt.automaticReconnect = true
mqtt.cleanSession = false
mqtt.connectionTimeout = 30
mqtt.maxInflight = 1024

添加对应的属性配置类

@Component
public class MqttConfigProperties {
    @Value("${mqtt.url}")
    private String url;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password}")
    private String password;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.defaultTopic}")
    private String defaultTopic;
    @Value("${mqtt.keepAliveInterval}")
    private Integer keepAliveInterval;
    @Value("${mqtt.automaticReconnect}")
    private Boolean automaticReconnect;
    @Value("${mqtt.cleanSession}")
    private Boolean cleanSession;
    @Value("${mqtt.connectionTimeout}")
    private Integer connectionTimeout;
    @Value("${mqtt.maxInflight}")
    private Integer maxInflight;
}

创建客户端配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {
    @Autowired
    private MqttConfigProperties mqttConfigProperties;

    @Bean
    public MqttConnectOptions mqttConnectOptions() {
        log.info("初始化mqtt信息{}", JSON.toJSON(mqttConfigProperties));
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttConfigProperties.getUsername());
        options.setPassword(mqttConfigProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigProperties.getUrl()});
        options.setKeepAliveInterval(mqttConfigProperties.getKeepAliveInterval());
        options.setAutomaticReconnect(mqttConfigProperties.getAutomaticReconnect());
        options.setCleanSession(mqttConfigProperties.getCleanSession());
        options.setConnectionTimeout(mqttConfigProperties.getConnectionTimeout());
        options.setMaxInflight(mqttConfigProperties.getMaxInflight());
        return options;
    }
    @Bean
    public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(mqttConnectOptions);
        return factory;
    }

    // 推送通道
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler sendHandler(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigProperties.getClientId() + "-publish", mqttPahoClientFactory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(1);
        messageHandler.setDefaultTopic(mqttConfigProperties.getDefaultTopic());
        log.info("初始化mqttOutputChannel...");
        return messageHandler;
    }


}

发送网关接口

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    /**
     * 发送消息
     *
     * @param topic
     * @param data
     */
    void send(@Header(MqttHeaders.TOPIC) String topic, String data);
}

这样,在发送消息时,直接将消息网关注入,调用发送方法就可以发送了

mqttGateway.send(topic, JSONObject.toJSONString(msg));

参考:
https://mqtt.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/402817.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C/C++的内存管理(1)

内存管理 C与C的内存分布C语言中动态内存管理方式回顾C内存管理的方式 C与C的内存分布 我们学习C语言时就知道&#xff0c;储存不同的变量计算机会相应分配不同区块的内存。那为什么要把内存化为不同的区域呢&#xff1f;实质上是为了方便管理 下面我们来看看下面一道例题&…

【Hudi】Upsert原理

17张图带你彻底理解Hudi Upsert原理 1.开始提交&#xff1a;判断上次任务是否失败&#xff0c;如果失败会触发回滚操作。然后会根据当前时间生成一个事务开始的请求标识元数据。2.构造HoodieRecord Rdd对象&#xff1a;Hudi 会根据元数据信息构造HoodieRecord Rdd 对象&#xf…

记录解决uniapp使用uview-plus在vue3+vite+ts项目中打包后样式不能显示问题

一、背景 从 vue2uview1 升级到 vue3vitetsuview-plus ,uview组件样式打包后不显示&#xff0c;升级前uview 组件是可以正常显示&#xff0c;升级后本地运行是可以正常显示&#xff0c;但是打包发布成H5后uview的组件无法正常显示&#xff0c;其他uniapp自己的组件可以正常显示…

指针笔试题(C语言进阶)

目录 前言 1、案例一 1.1 答案 1.2 解析 2、案例二 2.1 答案 2.2 解析 3、案例三 3.1 答案 3.2 解析 4、案例四 4.1 答案 4.2 解析 5、案例五 5.1 答案 5.2 解析 总结 前言 “纸上得来终觉浅&#xff0c;绝知此事要躬行”。本篇通过对指针实际案例的分析&…

Java基于SpringBoot的校园轻博客系统,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

NLP_构建GPT模型并完成文本生成任务

文章目录 搭建GPT模型&#xff08;解码器&#xff09;构建文本生成任务的数据集训练过程中的自回归文本生成中的自回归&#xff08;贪婪搜索&#xff09;完整代码小结 搭建GPT模型&#xff08;解码器&#xff09; GPT 只使用了 Transformer的解码器部分&#xff0c;其关键组件…

中医笔记(阴阳,五行,十二经脉,天干地支,子午流注,倪海厦中医笔记)

目录 一.阴阳1.1 什么是阴阳&#xff1f;1.2 作用1.3 阴阳理论在中医上的运用 二.五行2.1 五行之间的关系2.2 五行对应的力量2.3 原理&#xff1a; 三.天干地支四.子午流注十二经脉与子午流注之间的关系 五.十二经脉足太阳膀胱经 六.中医笔记小肠是火气化膀胱的水&#xff08;如…

java效率为什么比c/c++慢,蓝桥杯上java只得50分,c++通过?

java效率为什么比c/c慢,蓝桥杯上java只得50分&#xff0c;c通过&#xff1f; 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「c的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大…

车载测试,检测项目标准

检测项目&#xff1a; 二.GB/T 31486-2015电动汽车用动力蓄电池电性能要求及试验方法 说明&#xff1a;本标准规定了电动汽车用动力蓄电池(以下简称蓄电池)的 电性能要求、试验方法、检验规则。本标准适用于装载在电动汽车 上的锂离子蓄电池和金属氢化 物镍蓄电池单体和模块&a…

跟着pink老师前端入门教程(JavaScript)-day05

六、语句 &#xff08;一&#xff09;表达式和语句 1、表达式 表达式是可以被求值的代码&#xff0c;JavaScript 引擎会将其计算出一个结果。 2、语句 语句是一段可以执行的代码。 比如&#xff1a; prompt() 可以弹出一个输入框&#xff0c;还有 if语句 for 循环语句等…

创建型设计模式 - 原型设计模式 - JAVA

原型设计模式 一 .简介二. 案例三. 补充知识 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都在歌唱 一 .简介 原型模式提供了一种机制&#xff0c;可以将原始对象复制到新对象&#xff0…

Vue3_基础使用_3_Hooks模块化

今天主要学习的是hooks, vue3的使用比vue2方便很多了&#xff0c;但是呢各个功能块的逻辑有时候还是会缠绕在一起&#xff0c;这个时候使用hooks进行模块化管理开发&#xff0c;说白了就是将每个单独的业务放到自己的.ts中去写&#xff0c;以后修改就找到这个ts 不用到处去翻…

第三百六十一回

文章目录 1. 概念介绍2. 实现方法2.1 环绕效果2.2 立体效果 3. 示例代码4. 内容总结 我们在上一章回中介绍了"自定义SlideImageSwitch组件"相关的内容&#xff0c;本章回中将介绍两种阴影效果.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在本…

HL小祭記0221

早上很好&#xff0c;浑身酸疼&#xff0c;像被人*了 上午将字符串 一言难尽 中午天有点小雨 炸金花 额 潇寞手麻了&#xff0c;好快啊&#xff01; 靠开牌小赚一下 下午调题 动不动就一百行代码…… 小雨&#xff0c;中雨&#xff0c;大雨&#xff0c;电闪雷鸣 是不…

代码随想录算法训练营第58天 | 392.判断子序列 115.不同的子序列

判断子序列 这道题可以双指针方法解决。 class Solution { public:bool isSubsequence(string s, string t) {int s_index 0;for(int t_index 0; t_index < t.size(); t_index) {if(s[s_index] t[t_index]) {s_index;}}return s_index s.size();} };用动态规划也是可解…

Shader基础的简单实现(基于URP渲染)

一个模型是很多个顶点组成&#xff0c;顶点数据中包含坐标、法线、切线、UV坐标、顶点颜色等等组成。 URP(Universal Render Pipeline)通用渲染管线&#xff0c;是Unity在2019.3版本之后推出的一种新的渲染管线。传统的渲染管线在渲染多光源的情况&#xff0c;是把每一个主要光…

13款强大的开源API测试工具,不容错过!

使用SOA和微服务作为软件架构的趋势不断上升&#xff0c;催生了多种用于服务API自动化测试的工具。 API是应用程序接口&#xff08;application programming interface&#xff09;的缩写&#xff0c;是一套用于构建和集成应用软件的定义和协议&#xff0c;是两个或多个计算机…

2024新版Java高频面试题+Java八股文面试真题

Java面试题_2024新版Java高频面试题Java八股文面试真题 Java高频面试专题视频课程&#xff0c;瓤括了Java生态下的主流技术面试题&#xff0c;课程特色&#xff1a; 1、全面&#xff0c;jvm、并发编程、mysql、rabbitmq、spring、mybatis、redis、分布式、微服务、数据结构等等…

解决 ModuleNotFoundError: No module named ‘transformers‘

Traceback (most recent call last): File “start_cli_test.py”, line 2, in import transformers ModuleNotFoundError: No module named ‘transformers’ Traceback (most recent call last): File “/usr/local/python3/lib/python3.8/runpy.py”, line 185, in _run_m…

OpenCascade——BRepPrimAPI图元创建接口

OpenCascade BRepPrimAPI包提供了创建以下图元&#xff08;primitive&#xff09;的 API&#xff1a; 盒;锥体;柱体;棱镜。 可以创建部分实体&#xff0c;例如一定经度范围内的球体。在实际模型中&#xff0c;图元可用于轻松创建特定的子部件。 BRepPrimAPI也提供了扫掠方式…