Spring Integration + MQTT

1. 简介

Spring Integration:

Spring Integration是一个开源的Java库,用于构建基于消息的应用程序。它提供了一套丰富的组件和工具,使得开发者可以轻松地开发出可靠、灵活和可扩展的集成解决方案。以下是Spring Integration的一些主要用途:

  1. 企业服务总线(ESB): Spring Integration可以用来构建企业服务总线,它支持各种协议和消息格式,使得不同系统间的数据和事件可以轻松交换。

  2. 消息传递和解耦: 它支持在不同的应用程序组件之间进行异步消息传递,从而降低系统组件间的耦合度。

  3. 事件驱动架构: Spring Integration支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型。

  4. 数据转换和路由: 提供数据转换和路由的功能,可以将数据从一种格式转换为另一种格式,并根据内容将消息路由到不同的目的地。

  5. 错误处理: 它提供了一套完整的错误处理机制,包括重试、补偿和消息存储等策略。

  6. 文件和数据库集成: 可以轻松地与文件系统和数据库进行集成,支持文件传输、数据库操作等场景。

  7. 外部系统适配: 通过提供各种适配器,Spring Integration可以与外部系统(如JMS、AMQP、HTTP、FTP等)进行集成。

  8. 批处理和任务调度: 支持批处理操作和任务调度,可以用于处理大量数据或定时任务。

  9. 模块化和可扩展性: 它的模块化设计使得开发者可以根据需要添加或替换组件,从而构建高度可扩展的系统。

  10. 多环境支持: 支持多种部署环境,包括本地应用、云环境和微服务架构。

  11. 开发和配置的简化: 通过提供声明式的配置和简化的编程模型,Spring Integration降低了开发复杂性,并缩短了开发周期。

  12. 社区和生态系统: 作为Spring家族的一部分,Spring Integration受益于活跃的社区和广泛的生态系统,提供了大量的资源和支持。

Spring Integration + MQTT:

Spring Integration与MQTT的集成是一个非常强大的组合,它允许开发者在Spring应用程序中轻松地实现MQTT协议的消息发布和订阅功能。以下是Spring Integration与MQTT集成的一些主要用途和优势:

  1. 轻量级消息传递: MQTT是一种轻量级的消息传递协议,特别适合带宽有限和延迟敏感的环境,如物联网(IoT)应用。Spring Integration通过提供MQTT通道适配器,使得在Spring应用程序中集成MQTT变得简单直接 。

  2. 简化配置: 通过Spring Integration,开发者可以使用声明式配置来定义MQTT的入站(订阅)和出站(发布)消息通道,而不需要深入了解MQTT客户端库的复杂性 。

  3. 支持MQTT v5: 从Spring Integration 5.5.5版本开始,支持MQTT v5协议,包括对MQTT v5特有的消息属性的支持,如消息过期间隔、响应主题等 。

  4. 灵活的消息处理: Spring Integration提供了强大的消息处理能力,包括消息转换、路由、聚合、分割等,这些都可以通过声明式配置轻松实现 。

  5. 错误处理和重连机制: Spring Integration提供了错误处理机制,包括请求处理建议,例如重试或断路器。同时,支持MQTT的自动重连机制,确保了消息传递的可靠性 。

  6. 与Spring生态系统的集成: 作为Spring家族的一部分,Spring Integration可以很容易地与其他Spring项目(如Spring Boot、Spring Cloud等)集成,提供了与Spring Security、Spring Data等的无缝集成 。

  7. 提高开发效率: Spring Integration的声明式配置和编程模型简化了消息系统开发,降低了开发复杂性,并缩短了开发周期 。

  8. 动态主题管理: Spring Integration允许在运行时动态添加和删除MQTT订阅主题,提供了更高的灵活性 。

  9. 事件驱动架构: 支持事件驱动的架构风格,允许系统对事件做出响应,而不是基于传统的请求-响应模型 。

2. 基本时序架构

        1. 监听到订阅topic有消息流程

        2. 生产者推送一条消息后,中间经过一系列流程后被消费者消费的完整流程

3. 接收消息

通常涉及以下几个步骤:

1. 配置MQTT连接: 首先,需要配置与MQTT代理(如EMQX)的连接。这通常涉及到配置一个MqttPahoClientFactory Bean,它负责创建和管理MQTT客户端连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(mqttConnectOptions());
    return factory;
}

2. 创建入站通道适配器: 使用MqttPahoMessageDrivenChannelAdapter创建一个入站通道适配器。这个适配器负责从MQTT代理订阅主题,并在接收到消息时将消息发送到Spring Integration的通道。

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttInboundConfiguration {

    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    @Resource(name = ChannelName.INBOUND)
    private MessageChannel inboundChannel;

    /**
     * Clients of inbound message channels.
     * @return
     */
    @Bean(name = "adapter")
    public MessageProducerSupport mqttInbound() {
        MqttClientOptions options = MqttConfiguration.getBasicClientOptions();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                options.getClientId() + "_consumer_" + System.currentTimeMillis(),
                mqttClientFactory, options.getInboundTopic().split(","));
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        // use byte types uniformly
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(1);
        adapter.setOutputChannel(inboundChannel);

        // 添加钩子函数,确保在程序关闭时正确断开连接
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                if (adapter != null) {
                    adapter.stop();
                    log.warn("[consumer] MQTT client stopped successfully.");
                }
            } catch (Exception e) {
                log.error("[consumer] MQTT client stopped with error: {}",e.getMessage(),e);
            }
        }));
        return adapter;
    }

3. 配置消息通道: 配置一个消息通道(如DirectChannel),用于传输从MQTT代理接收到的消息。

@Bean(name = ChannelName.INBOUND)
public MessageChannel inboundChannel() {
    return new ExecutorChannel(threadPool);
}

4. 设置消息监听器: 使用@ServiceActivator注解定义一个服务激活器,它将作为消息监听器处理接收到的消息。这个消息监听器可以是一个方法,这个方法将对通道中的消息进行处理。

5. 处理消息: 实现业务逻辑来处理消息。这通常涉及到从消息中提取数据,并执行所需的操作,例如更新数据库、调用服务或触发事件。

@Bean
@ServiceActivator(inputChannel = ChannelName.INBOUND)
public MessageHandler defaultInboundHandler() {
    return message -> {
    // 处理消息
        // log.info("The default channel does not handle messages." +
        //         "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
        //         "\nPayload: " + message.getPayload());
    };
}

4. 发布信息

        发送MQTT消息通常是通过配置出站通道适配器(MqttOutboundChannelAdapter)来实现的。这个适配器负责将从Spring Integration通道中传来的消息发布到指定的MQTT主题上。

发送MQTT消息的步骤:

1. 配置MQTT客户端工厂(MqttPahoClientFactory: 这个工厂负责创建和管理MQTT客户端连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(mqttConnectOptions());
    return factory;
}

2. 配置MQTT出站通道适配器(MqttOutboundChannelAdapter: 这个适配器将消息通道中的消息发布到MQTT代理上。

@Configuration
public class MqttOutboundConfiguration {

    @Autowired
    private MqttPahoClientFactory mqttClientFactory;

    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTBOUND)
    public MqttOutboundChannelAdapter mqttOutboundAdapter() {
        MqttOutboundChannelAdapter adapter = new MqttOutboundChannelAdapter(
                "client_id", 
                mqttClientFactory, 
                "outputTopic");
        adapter.setQos(1); // 设置服务质量
        adapter.setAsync(true); // 异步发送消息
        return adapter;
    }
}

可以通过setDefaultTopic方法设置默认主题,这样在发送消息时如果没有指定主题,就会使用这个默认主题。

3. 发送消息到消息通道: 通过编程方式或通过其他Spring Integration组件,将消息发送到与MqttOutboundChannelAdapter绑定的消息通道。

@Autowired
private MessageChannel mqttOutboundChannel;

public void sendMqttMessage(String payload) {
    mqttOutboundChannel.send(MessageBuilder.withPayload(payload).build());
}

注:

1. 要确定消息发送到哪一个主题,可以在发送消息时通过消息头MqttHeaders.TOPIC指定。如果没有指定,就会使用在MqttPahoMessageHandler中配置的默认主题。

@Autowired
private MessageChannel mqttOutboundChannel;

public void sendMqttMessage(String topic, String payload) {
    mqttOutboundChannel.send(MessageBuilder.withPayload(payload)
                                         .setHeader(MqttHeaders.TOPIC, topic)
                                         .build());
}

2. 通过使用IMqttMessageGateway接口去发送消息到OUTBOUND通道,再由MqttPahoMessageHandler去处理消息

@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttMessageGateway {

    /**
     * Publish a message to a specific topic.
     * @param topic target
     * @param payload   message
     */
    void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);

    /**
     * Use a specific qos to push messages to a specific topic.
     * @param topic     target
     * @param payload   message
     * @param qos   qos
     */
    void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/891290.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Webpack 完整指南

​🌈个人主页:前端青山 🔥系列专栏:Webpack篇 🔖人终将被年少不可得之物困其一生 依旧青山,本期给大家带来webpack篇专栏内容:webpack介绍 目录 介绍 一、webpack 1.1、webpack是什么 1.2 webpack五个核心配置 1.…

浏览器服务端文件下载控制(安全阻止、文件浏览器打开还是下载行为控制)

文章目录 简介Chrome已阻止不安全内容下载PDF直接打开txt、xml、js文件被自动打开了而不是下载阿里OSS设置response header阿里OSS修改metadata 简介 随着浏览器的发展,有很多安全方面的限制,对我们的文件下载行为产生了很大的影响。 在JavaScript下载…

云手机:社交平台运营的热门工具

随着互联网的飞速发展,社交平台已经成为企业推广和营销的核心渠道。传统的运营方式已经无法满足高效运营的需求,而云手机作为新兴工具,逐渐成为社交平台运营的前沿趋势。本文将深入分析云手机如何优化社交平台的运营流程,助力企业…

手机中的ip地址是什么意思?可以改手机ip地址吗

‌IP地址,作为手机与网络通信的关键要素,不仅承担着网络通信的基础角色,还涉及网络安全、位置定位以及网络管理等多重功能。了解手机IP地址的含义及其修改方法,对于保护个人隐私、突破网络访问限制等方面具有重要意义。 一、手机I…

如何将mov格式的视频转换mp4?5种解决方法任你选!

MOV即QuickTime影片格式,它是Apple公司开发的一种音频、视频文件格式,用于存储常用数字媒体类型。然而,它的兼容性主要局限于苹果生态系统。有时,我们需要IOS和Mac设备的视频图片保存到安卓手机或Windows系统中,却发现…

国标GB28181软件LiteGBS国标GB28181公网平台分享链接面临不生效该如何解决?

在当今社会,随着科技的飞速发展,各种新兴技术正以前所未有的速度融入我们的日常生活,其中,LiteGBS作为一种基于国标GB28181协议的视频监控平台,正以其独特的功能和广泛的应用领域,深刻地影响着我们的生活方…

MySQL-约束Constraint详解

文章目录 约束简介非空约束检查约束唯一约束列级约束与表级约束给约束起名字 主键约束主键概念以及注意事项 外键约束外键概念以及注意事项外键使用场景约束的删除与添加级联相关操作级联删除(on delete cascade)级联更新(on update cascade)级联置空(on delete set null) 约束…

使用js和canvas实现简单的网页打砖块小游戏

玩法介绍 点击开始游戏后,使用键盘上的←→控制移动,小球会不停移动,板子触碰小球时会反弹,碰撞到砖块时会摧毁砖块,如果没有用板子接住小球就游戏失败 代码实现 代码比较简单,直接阅读注释即可&#x…

抖音小游戏画图位置移动

文章目录 画图移动图形位置 画图 const canvas tt.createCanvas(); const context canvas.getContext(2d);context.width 500; context.height 500;let isPressing false; // 是否按下 let startX 0; let startY 0;context.fillStyle "#f00"; context.fillR…

骨传导耳机哪个牌子的最好?全面测评分享5大热门骨传导耳机

在当今快节奏的生活中,人们越来越重视健康与休闲的平衡,而音乐则是连接这两者的重要桥梁。对于经常进行户外活动或锻炼的人来说,传统入耳式耳机可能存在安全隐患,这时,骨传导耳机便成为了理想的选择。骨传导技术通过振…

82.【C语言】数据结构之顺序表

在软件开发中,存储列表常用顺序表或链表 1.线性表 定义:n个具有相同特性的数据元素的有限序列(相当于一条直线)(用数组存储),要求数据依次存储 2.分类 1.静态顺序表:使用定长数组存储元素 代码示例(写入Seqlist.h中) typedef int SLDataType;//将int重定义为SL…

Java:玩家打怪小游戏

今天,我们尝试用Java来做一个“打怪小游戏”,听名字就知道,我们是应该创建几个成员和怪物,还有知道知道成员和怪物的血量,一次攻击的伤害等等。。当然我们的游戏攻击模式是“回合制”(其实是别的方法&#…

云开发 | 微信小程序云开发无法获取数据库数据

1.我在我的云数据库中创建了一个数据表(即collection数据集)userList,并且存入了两条用户信息数据 2. 想要通过按钮触发事件拿取数据库中数据并且打印在控制台时,获取数据失败,控制台无输出 3. 初始化 | 在开始使用数据库 API 进…

androidStudio编译导致的同名.so文件冲突问题解决

files found with path lib/arm64-v8a/libserial_port.so from inputs: ...\build\intermediates\library_jni\debug\jni\arm64-v8a\libserial_port.so C:\Users\...\.gradle\caches\transforms-3\...\jni\arm64-v8a\XXX.so 解决方式如下: 1.将gradle缓存文件删…

Linux系统——lvm逻辑卷

Linux系统——lvm逻辑卷 一、lvm逻辑卷1、lvm操作流程2、操作指令 二、逻辑卷操作1、创建逻辑卷1.1 /dev/cloud/openstack 5G xfs /cloud/openstack1.2 /dev/cloud/docker 10G ext4 /cloud/docker 2、逻辑卷扩容2.1 扩容流程2.2 需求一:扩容ext4文件系统的逻辑卷2.3…

新手给视频加字幕的方法有哪些?4种加字幕方法推荐!

在视频制作中,字幕不仅是传递信息的重要手段,还能增强视频的观感和专业性。对于新手来说,如何给视频添加字幕可能是一个挑战。本文将介绍字幕的类型、推荐添加字幕的工具,以及详细添加字幕方法,帮助新手轻松掌握视频字…

宠物咖啡馆业务自动化:SpringBoot框架的实现方法

3系统分析 3.1可行性分析 通过对本基于Spring Boot的宠物咖啡馆平台的设计与实现实行的目的初步调查和分析,提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于Spring Boot的宠物咖啡馆…

微前端 Spa qiankun

简介 首先什么是微前端? 他是一个软件架构模式。借鉴了后端的为服务架构思想,是将复杂单一的前端进行拆分成多个可以独立开发、部署、维护的小型应用。不同的应用关注不同的业务。最终将其集成到一个主框架里面。简单来说就是先分后合。 传统前端开发的…

【Unity - 屏幕截图】技术要点

在Unity中想要实现全屏截图或者截取某个对象区域的图片都是可以通过下面的函数进行截取 Texture2D/// <summary>/// <para>Reads the pixels from the current render target (the screen, or a RenderTexture), and writes them to the texture.</para>/…

【氮化镓】低温对p-GaN HEMT迁移率、阈值电压和亚阈值摆幅的影响

本期分享一篇低温对p-GaN HEMT 迁移率、阈值电压和亚阈值摆幅影响进行表征和建模的研究论文。文章作者Shivendra Kumar Singh、Thien Sao Ngo、Tian-Li Wu(通讯作者)和Yogesh Singh Chauhan,分别来资源中国台湾阳明交通大学国际半导体技术学院、印度理工学院坎普尔分校电气工…