springboot+mqtt使用总结

1.软件的选型

1.1.使用免费版EMQX

1.1.1.下载

百度搜索的目前是会打开官网,这里提供下免费版的使用链接EMQX使用手册

文档很详细,这里不再记录了。

1.2.使用rabbitmq

rabbitmq一般做消息队列用,作为mqtt用我没有找到详细资料,这里总结下使用方法:

1.window安装rabbitmq

首先安装rabbitmq得依赖,也就是opt_win64_24.0.exe,然后傻瓜式安装接可
安装完毕,进入安装目录下,sbin文件夹

1.浏览器查看插件 执行命令
rabbitmq-plugins enable rabbitmq_management

回车,浏览器输入http://127.0.0.1:15672/#/看到此页面及安装成功,默认账号密码均是 guest

2.注意:如果做mqtt使用的话,需安装mqtt插件 安装命令
rabbitmq-plugins enable rabbitmq_mqtt
执行完命令,在浏览器上查看 mqtt及其端口号出现了的话,就证明安装成功,下面就可以开始整合了

2.linux安装rabbitmq

以前公司都是用window服务器,没用过linux,折腾了好久,安装 erlang与rabbitmq不对应 不是最新 等等一系列问题,最后看了一个视频 用 dock安装 根据官网
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
一句话就可以安装
如果后期需要安装插件
docker exec <容器id> rabbitmq-plugins enable rabbitmq_mqtt
ps:查看容器id 方法
1.使用docker ps -aqf “name=containername” -------简短容器id
2.docker inspect --format="{{.Id}}" container_name -------详情容器id

带密码启动dock
docker run -it --rm --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=密码 -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbitmq:management


15672 是rabbitmq management管理界面默认访问端口
5672 是amqp默认端口
1883 是mqtt tcp协议默认端口
15675 是web_mqtt ws协议默认端口

2.springboot集成mqtt

2.1:yml文件集成配置

iot:
  mqtt:
    clientId: mqttClientOutputId
    sendTopic: ktcotrl/dy/#
    topics:
      - /ktcotrl/#
      - gateway/#    
    default:
      topic: "/ktcotrl/dy/*****"
      qos: 1
      receive:
        enable: true
    serverClientId: mqttClientInputId
    servers: tcp://ip:1883
    username: username
    password: password

2.2:主要代码


@Slf4j
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;


    /*
     *
     *  MQTT连接器选项
     * *
     */
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }

    @Bean
    public MessageChannel iotMqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
                        mqttClientFactory(),
                        mqttConfig.getTopics().toArray(new String[0]));
//                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(2);
        adapter.setOutputChannel(iotMqttInputChannel());
        return adapter;
    }

  

 
    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {

                String topic= (String) message.getHeaders().get("mqtt_receivedTopic");
//                 msgid= message.getHeaders().get("id");
                 String messageContents= message.getPayload().toString();
              //操作
            }
        };
    }





    @Bean
    public MessageChannel defaultMqttInputChannel() {
        return new DirectChannel();
    }

    @Value("${iot.mqtt.default.topic}")
    private String defaultTopic;
    /**
     * 说明:
     * ConditionalOnProperty(value = "driver.mqtt.default.receive.enable")
     * 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑
     *
     * @return
     */
//    @Bean
//    @ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
//    public MessageProducer defaultInbound() {
//        MqttPahoMessageDrivenChannelAdapter adapter =
//                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(),
//                        mqttClientFactory(),
//                        defaultTopic);
//        adapter.setCompletionTimeout(5000);
//        adapter.setConverter(new DefaultPahoMessageConverter());
//        adapter.setQos(2);
//        adapter.setOutputChannel(defaultMqttInputChannel());
//        return adapter;
//    }

    /**
     * 说明:
     * ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
     * 根据配置属性driver.mqtt.default.receive.enable选择是否开启 Default Topic 主题的数据接收逻辑
     *
     * @return
     */
//    @Bean
//    @ServiceActivator(inputChannel = "defaultMqttInputChannel")
//    @ConditionalOnProperty(value = "iot.mqtt.default.receive.enable")
//    public MessageHandler defaultHandler() {
//
//        return message -> {
//            log.info(
//                    "defaultTopicReceiver\nheader:{},\npayload:{}",
//                    JSON.toJSONString(message.getHeaders(), true),
//                    JSON.toJSONString(message.getPayload(), true)
//            );
//        };
//    }
}
@Getter
@Setter
@Component
@IntegrationComponentScan
@ConfigurationProperties(prefix = "iot.mqtt")
public class MqttConfig {
    /*
     *
     * 服务地址
     */

    private String servers;

    /**
     * 客户端id
     */


    private String clientId;
/*
*
     * 服务端id
     */

    private String serverClientId;
/*
*
     * 默认主题
     */

    private String[] defaultTopic;

    private String sendTopic;
    /*
     *
     * 用户名和密码*/


    private String username;

    private String password;

    private List<String> topics;
}
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class IotMqttSendConfig {
    @Autowired
    private MqttConfig mqttConfig;

    /**
     * 将channel绑定到MqttClientFactory上
     * ServiceActivator 表明当前方法用于处理Mqtt消息,inputChannel用于接收消息的通道
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttConfig.getUsername());
        mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getServers()});
        mqttConnectOptions.setKeepAliveInterval(2);
        factory.setConnectionOptions(mqttConnectOptions);
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttConfig.getServerClientId(), factory);
        messageHandler.setAsync(true);
        messageHandler.setDefaultRetained(false);
        messageHandler.setDefaultTopic(mqttConfig.getSendTopic());
        return messageHandler;
    }

    /* 发布者 */
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}
@RestController
@RequestMapping("/path")
@Slf4j
public class WkqController {

    @Autowired
    private IotMqttGateway mqttGateway;
    @RequestMapping("/test")
    @ResponseBody
    public void test() {
       
      //topic:主题
        mqttGateway.sendMessage2MqttHex( topic,1, "sendStr");
    }
   
/**
 * @description rabbitmq mqtt协议网关接口
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface IotMqttGateway {

    void sendMessage2Mqtt(String data);

    void sendMessage2Mqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic,
                          @Header(MqttHeaders.QOS) int qos, String payload);
    void sendMessage2MqttHex(@Header(MqttHeaders.TOPIC) String topic,
                          @Header(MqttHeaders.QOS) int qos, byte[] payload);
    void sendMessage3Mqtt(@Header(MqttHeaders.TOPIC) String topic,
                          @Header(MqttHeaders.RECEIVED_TOPIC)String revicetopic,
                          @Header(MqttHeaders.QOS) int qos, String payload);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/690381.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[AIGC] SpringBoot的自动配置解析

下面是一篇关于SpringBoot自动配置的文章&#xff0c;里面包含了一个简单的示例来解释自动配置的原理。 SpringBoot的自动配置解析 Spring Boot是Spring的一个子项目&#xff0c;用于快速开发应用程序。它主要是简化新Spring应用的初始建立以及开发过程。其中&#xff0c;自动…

武汉理工大学 云计算与服务计算 期末复习

云计算与的定义 长定义是&#xff1a;“云计算是一种商业计算模型。它将计算任务分布在大量计算机构成的资源池上&#xff0c;使各种应用系统能够根据需要获取计算力、存储空间和信息服务。” 短定义是&#xff1a;“云计算是通过网络按需提供可动态伸缩的廉价计算服务。 云计…

批量转换更高效:一键修改TXT后缀名转DOCX,轻松实现文件高效管理!

在日常生活和工作中&#xff0c;我们经常需要处理大量的文件&#xff0c;而文件格式的转换和管理往往是其中一项繁琐的任务。特别是当需要将大量的TXT文件转换为DOCX格式时&#xff0c;传统的逐个手动操作不仅效率低下&#xff0c;还容易出错。然而&#xff0c;现在有了我们这款…

【ArcGIS微课1000例】0117:ArcGIS中如何将kml(kmz)文件转json(geojson)?

文章目录 一、kml获取方式二、kml转图层三、图层转json一、kml获取方式 kml文件是一种很常用的数据格式,可以从谷歌地球(googleearth)获取某一个地区的kml范围文件,如青海湖(做好的kml文件可以从配套实验数据包0117.rar中获取)。 二、kml转图层 打开【KML转图层】工具,…

在线渲染3d怎么用?3d快速渲染步骤设置

在线渲染3D模型是一种高效的技术&#xff0c;它允许艺术家和设计师通过互联网访问远程服务器的强大计算能力&#xff0c;从而加速渲染过程。无论是复杂的场景还是高质量的视觉效果&#xff0c;在线渲染服务都能帮助您节省宝贵的时间。 在线渲染3D一般选择的是&#xff1a;云渲染…

数据结构之初识泛型

目录&#xff1a; 一.什么是泛型 二.引出泛型 三.泛型语法及&#xff0c;泛型类的使用和裸类型(Raw Type) 的了解 . 四.泛型的编译&#xff1a; 五.泛型的上界 六.泛型方法 注意&#xff1a;在看泛型之前可以&#xff0c;回顾一下&#xff0c;包装类&#xff0c;包装类就是服务…

【Python预处理系列】深入理解过采样技术及其Python实现

目录 一、过采样简介 二、过采样的实现方法 三、过采样和欠采样是数据增强吗 四、Python实现SMOTE过采样 &#xff08;一) 生成不平衡数据集 &#xff08;二&#xff09; 将数据集转换为DataFrame&#xff0c;便于展示 &#xff08;三) 应用SMOTE算法进行过采样 &…

命令行打包最简单的android项目从零开始到最终apk文件

准备好需要的工具 AndroidDevTools - Android开发工具 Android SDK下载 Android Studio下载 Gradle下载 SDK Tools下载 jdk的链接我就不发出来,自己选择,我接下来用的是8版本的jdk和android10的sdk sdk的安装和环境变量的配置 sdk tool压缩包打开后是这样子,打开sdk mana…

3-1RT-Thread时钟管理

这里写自定义目录标题 时钟节拍是RT thread操作系统的最小时间单位。 第一个功能&#xff0c;rt tick值自动加1&#xff0c;在RT thread当中通过RT_USING_SMP定义了多核和单核的场景。第二个功能&#xff0c;检查当前线程的时间片&#xff0c;首先获取当前线程&#xff0c;将当…

[数据集][目标检测]室内积水检测数据集VOC+YOLO格式761张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;761 标注数量(xml文件个数)&#xff1a;761 标注数量(txt文件个数)&#xff1a;761 标注类别…

【Text2SQL 论文】PET-SQL:用 Cross-Consistency 的 prompt 增强的两阶段 Text2SQL 框架

论文&#xff1a;PET-SQL: A Prompt-enhanced Two-stage Text-to-SQL Framework with Cross-consistency ⭐⭐⭐ arXiv:2403.09732&#xff0c;商汤 & 北大 Code&#xff1a;GitHub 一、论文速读 论文一开始提出了以往 prompt-based 的 Text2SQL 方法的一些缺点&#xff1…

Linux卸载残留MySQL【带图文命令巨详细】

Linux卸载残留MySQL 1、检查残留mysql2、检查并删除残留mysql依赖3、检查是否自带mariadb库 1、检查残留mysql 如果残留mysql组件&#xff0c;使用命令 rpm -e --nodeps 残留组件名 按顺序进行移除操作 #检查系统是否残留过mysql rpm -qa | grep mysql2、检查并删除残留mysql…

[职场] 关于薪酬需要知道的两个知识点 #知识分享#知识分享

关于薪酬需要知道的两个知识点 薪酬问题是面试过程中比较核心的问题&#xff0c;也是每次面试必问的。如果你进入到面试的后一阶段&#xff0c;这类问题可以让面试官或企业判断求职者的要求是否符合企业的薪酬标准&#xff0c;并进一步判断求职者对自身价值的认可程度。关于薪…

设计模式-六大原则

概述 设计模式体现的是软件设计的思想&#xff0c;而不是软件技术&#xff0c;它重在使用接口与抽象类来解决各种问题。在使用这些设计模式时&#xff0c;应该首先遵守六大原则。 原则含义具体方法开闭原则对扩展开放&#xff0c;对修改关闭多使用抽象类和接口里氏代换原则基…

文件属性与目录

一、Linux 系统中的文件类型 Linux 系统中的文件类型 Linux 下一切皆文件&#xff0c;文件作为 Linux 系统设计思想的核心理念。 1、普通文件 普通文件&#xff08; regular file &#xff09;在 Linux 系统下是最常见的&#xff0c;譬如文本文件、二进制文件&#xff0c…

用户输入表格数据设计(XPTable控件使用说明九)

XP Table控件可以编辑数据&#xff0c;程序也可以使用编辑后的数据&#xff0c;但是程序新建时又从初始化数据到模型到显示&#xff0c;这两步有点绕&#xff0c;做了一个实例来说明这块内容。 流程1&#xff1a;初始化数据--> model--> UI show 流程2&#xff1a;UI--…

Vue09-事件处理

一、一个简单的示例 v-on&#xff1a;当xxx的时候。 二、事件处理 2-1、参数说明 <div id"root"><h1>你好呀&#xff0c;{{name}}</h1><button v-on:click"showinfo">点击我</button></div><script>new Vue({e…

css3 都有哪些新属性

1. css3 都有哪些新属性 1.1. 圆角边框 (border-radius)1.2. 盒子阴影 (box-shadow)1.3. 文本阴影 (text-shadow)1.4. 响应式设计相关属性1.5. 渐变背景 (gradient backgrounds)1.6. 透明度 (opacity 和 rgba/hsla)1.7. 多列布局 (column-count, column-gap, etc.)1.8. 变换 (t…

正大国际期货:沪深300股指期货如何设置止损?

简单易懂的止损设置方法&#xff1a; 1. 平衡点止损法&#xff1a; 刚开始投资时&#xff0c;先设定一个止损点&#xff0c;通常在买入价格的1%到30%之间。 如果价格上升&#xff0c;就把止损点移动到你的买入价格&#xff0c;这样即使价格回落&#xff0c;你也不会亏本。 随…

c++使用_beginthreadex创建线程

记录使用_beginthreadex()&#xff0c;来创建线程。方便后期的使用。 创建一个线程 相关函数介绍 unsigned long _beginthreadex( void *security, // 安全属性&#xff0c; 为NULL时表示默认安全性 unsigned stack_size, // 线程的堆栈大小&#xff0c; 一般默认为0 u…