Spring Integration 是什么?

Spring Integration 是什么?

Spring Integration 在 Spring 家族不太有名气,如果不是有需求,一般也不会仔细去看。那么 Spring Integration 是什么呢?用官方的一句话来解释就是:它是一种轻量级消息传递模块,并支持通过声明式适配器与外部系统集成。简单来说,Spring Integration 抽象了用于消息传递的一套规范,并且基于这套规范提供了很多企业级的中间件的集成。比如他支持基于 AMQP 的消息队列、MQTT、RMI 等等中间件。

用过 Spring 家族组件的同学应该会比较容易理解了。例如,Spring Data 抽象了数据访问的一系列接口,后端可支持多种 ORM;Spring Cache 抽象了缓存使用的接口,后端支持 Caffeine、Redis、Memcached 等缓存中间件。其实这都是一样的。好处是,我们只需要熟悉这一种规范,就可以任意的去对接各种企业级框架,起到快速开发的作用;劣势是,这些企业级的框架只能再 Spring 抽象的这套规范下工作,对于一些细节的开发,可能仍然需要使用原生的框架来实现。

本文主要介绍的是 Spring Integration,以及它是如何集成 MQTT 协议的。

Spring Integration 消息抽象

刚刚我们讲了,Spring Integration 实际上就是抽象出了消息传递的规范,然后再适配各种消息中间件。那么下面我们先简单了解下 Spring Integration 消息通信的模式。

image.png

image.png

image.png

image.png

image.png

image.png

以上几张官方提供的图可以大致厘清 Spring Integration 的各类组件和工作模式:

  1. Message 包含 Header 和 Payload 两部分。
  2. MessageChannel 用于解耦生产者和消费者,实现消息发送。
  3. MessageRouter 用来控制消息转发的 Channel。
  4. Service Activitor 用来绑定 MessageHandler 和用于消费消息的 MessageChannel。
  5. ChannelAdapter 用来连接 MessageChannel 和具体的消息端口,例如通信的 topic。

在开发上就需要去了解这些抽象组件的具体实现了,在下面讲到 MQTT 的集成上可以再体会一下 SI 的设计思路。

MQTT 协议

MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. MQTT today is used in a wide variety of industries, such as automotive, manufacturing, telecommunications, oil and gas, etc.

这是 MQTT 协议的官方描述,它是一种应用于物联网的轻量级的发布订阅协议,类似于 AMQP。详细了解可以参考:

  • MQTT Specifications
  • [emqx mqtt 协议介绍](docs.emqx.cn/broker/v4.3… 协议)
  • MQTT 协议中文版
  • 消息推送标准协议:MQTT

下面提一些重要的或者开发中需要配置的点。

通信方式

默认是发布 / 订阅模式的。

  1. 通信系统中有发布者和订阅者。发布者发布消息而订阅者接收消息。我们把发布者和订阅者统称为客户端。客户端可以同时是发布者和订阅者。
  2. 在系统中有另外一个角色,它接收发布者的消息并且将消息派发给订阅者。我们一般称这个角色为消息 Broker。
  3. 在 MQTT 中默认是广播的,也就是说订阅了相同 topic 的订阅者都能收到发布者发送的消息。

基于主题 (Topic) 消息路由

MQTT 协议基于主题 (Topic) 进行消息路由,主题 (Topic) 类似 URL 路径,例如:

 

bash

复制代码

chat/room/1 sensor/10/temperature sensor/+/temperature

主题 (Topic) 通过'/'分割层级,支持'+', '#'通配符:

  • '+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  • '#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c/d
  • 订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

QoS

为了满足不同的场景,MQTT 支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

  • 0:At most once。消息发送者会想尽办法发送消息,但是遇到意外并不会重试。
  • 1:At least once。消息接收者如果没有知会或者知会本身丢失,消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息。
  • 2:Exactly onces。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别 2 是最合适的。

订阅者收到 MQTT 消息的 QoS 级别,最终取决于发布消息的 QoS 和主题订阅的 QoS

Broker 选型

本文使用的 MQTT Broker 是 EMQ X 的开源版。

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。 Erlang/OTP 是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed) 的语言平台。

客户端代码集成

Java 客户端一般使用 Eclipse Paho Java Client,此客户端为 Java SE 版本的,为了在 SpringBoot 上有更好的集成,这里我们使用 Spring Integration,Spring Integration MQTT Support 默认集成的就是 Eclipse Paho Java Client V3 版本。

依赖和参数配置

 

xml

复制代码

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

 

yml

复制代码

mqtt: url: tcp://172.17.218.94:1883 username: admin password: public clientId: mqtt-sender

 

java

复制代码

@Data @Component @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { private String url; private String username; private String password; private String clientId; }

发布者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { @Autowired private MqttProperties prop; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setServerURIs(new String[]{prop.getUrl()}); mqttConnectOptions.setUserName(prop.getUsername()); mqttConnectOptions.setPassword(prop.getPassword().toCharArray()); // 客户端断线时暂时不清除,直到超时注销 mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setAutomaticReconnect(true); factory.setConnectionOptions(mqttConnectOptions); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(), mqttClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultRetained(false); messageHandler.setAsyncEvents(false); // Exactly Once messageHandler.setDefaultQos(2); messageHandler.setDefaultTopic(ApiConst.MQTT_TOPIC_SUFFIX); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } } @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttTemplate { void send(String payload); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos); }

  1. @IntegrationComponentScan,开启 Spring Integration 的注解扫描。
  2. 注入客户端工厂类 MqttPahoClientFactory,此处可以配置认证参数、超时时间等 broker 连接参数。
  3. 注入 MessageChannel 实例。
  4. 注入 MessageHandler 的实例,并通过 @ServiceActivator 绑定到对应的 MessageChannel。此处可配置消息处理的模式、QoS、默认的 Topic 等。
  5. 定义一个 @MessagingGateway 修饰的接口,用于消息的发送,@MessagingGatewaydefaultRequestChannel 参数用于绑定具体的 MessageChannel
  6. 在使用的地方自动注入 MqttTemplate 的实例,即可调用方法发送消息。

订阅者配置

 

java

复制代码

@Configuration @IntegrationComponentScan public class MqttConfig { private final MqttProperties prop; private final MqttInboundMessageHandler mqttInboundMessageHandler; public MqttConfig(MqttProperties prop, MqttInboundMessageHandler mqttInboundMessageHandler) { this.prop = prop; this.mqttInboundMessageHandler = mqttInboundMessageHandler; } @Bean public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(prop.getClientId() + "-sub-" + Instant.now().toEpochMilli(), mqttClientFactory, "facego/reply"); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler InboundMessageHandler() { return mqttInboundMessageHandler; } @Bean public MessageChannel mqttInboundChannel() { return new DirectChannel(); } } @Slf4j @Component public class MqttInboundMessageHandler implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("mqtt reply: {}", message.getPayload()); } }

  1. 注入消息处理的 MessageChannel
  2. 注入自己实现的 MqttInboundMessageHandler,并通过 @ServiceActivator 绑定到对应的 MessageChannel
  3. 注入 Channel Adapter 的实例,配置客户端订阅的 Topic 和相应的 MessageChannel

Spring Integration 大致交互逻辑

对于发布者:

  1. 消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
  2. DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

  1. 通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel
  2. 同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定,这里是 MQTT Eclipse Paho Java Client。

总结

本文主要介绍了 Java 使用 MQTT 通信的方式,由于使用了 SpringBoot,因此使用 Spring Integration 来集成会比直接只用 Eclipse Paho Java Client 更符合 Spring 的哲学,所有的 Bean 均单例注入统一管理。

Spring Integration 的好处在于,我们只需要了解其消息通信的基本机制,屏蔽了 Eclipse Paho Java Client 的具体细节,便于编码。从上面的代码可以看出,我们仅仅注入了相关的 Bean,给出响相应的配置信息即可。

参考文献

  • Spring Integration Reference Guide
  • Spring Integration 中文手册(完整版)
  • SpringBoot 集成 MQTT 配置
  • Spring Boot 集成 MQTT
  • 消息推送标准协议:MQTT

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/495931.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

干货分享:品牌如何通过社媒激发年轻人消费力?

随着年轻人的消费愈发理性&#xff0c;年轻人在消费时更偏向于熟人种草场景下的信任决策&#xff0c;社交媒体成为品牌吸引用户的必争之地。今天媒介盒子就来和大家聊聊&#xff1a;品牌如何通过社媒激发年轻人消费力。 一、 激发用户共鸣&#xff0c;与用户产生情感连接。 虽…

通达信指标公式--通达信波段极品抄底指标公式,波段顶部和底部的判断

今日分享的通达信波段极品抄底指标公式&#xff0c;是一个波段顶底的提示指标。 具体信号说明&#xff1a; 当指标信号柱出现洋红柱子时&#xff0c;是波段底的信号&#xff0c;此时是参考持股的信号。 抄底信号出现在黄色直线附近较好&#xff0c;出现在背离走势更好。懂波浪…

【正点原子FreeRTOS学习笔记】————(12)信号量

这里写目录标题 一、信号量的简介&#xff08;了解&#xff09;二、二值信号量&#xff08;熟悉&#xff09;三、二值信号量实验&#xff08;掌握&#xff09;四、计数型信号量&#xff08;熟悉&#xff09;五、计数型信号量实验&#xff08;掌握&#xff09;六、优先级翻转简介…

[优选算法专栏]专题十五:FloodFill算法(一)

本专栏内容为&#xff1a;算法学习专栏&#xff0c;分为优选算法专栏&#xff0c;贪心算法专栏&#xff0c;动态规划专栏以及递归&#xff0c;搜索与回溯算法专栏四部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小…

缓冲区溢出漏洞相关知识点汇总

1.缓冲区基础知识相关定义 缓冲区定义&#xff1a;缓冲区一块连续的内存区域&#xff0c;用于存放程序运行时&#xff0c;加载到内存的运行代码和数据。 缓冲区溢出&#xff1a;缓冲区溢出是指程序运行时&#xff0c;向固定大小的缓冲区写入超过其容量的数据。多余的数据会越…

DFS:从递归去理解深度优先搜索

一、深入理解递归 二、递归vs迭代 三、深入理解搜索、回溯和剪枝 四、汉诺塔问题 . - 力扣&#xff08;LeetCode&#xff09; class Solution { public: //笔试题&#xff0c;不讲武德&#xff0c;CAvoid move(int n,vector<int>& A, vector<int>& B, ve…

充值活动倒计时!快来get您的春日豪礼

春分已至 万物生辉 就在上周末 马拉松赛事霸屏朋友圈 不论是燃动全城的汉马 还是集结万人的锡马 马拉松精神给予我们的是 挑战自我、永不言弃 奋力前行、昂扬向上的力量 在这万物复苏的阳春三月 正是潜心钻研 奋力拼搏的好时节 神工坊为广大仿真行业科技工作者 送上春…

净化室内空气有妙招,约克VRF甲醛净化中央空调给全家人舒适守护

早春3月&#xff0c;春回大地&#xff0c;又到了万物复苏、草长莺飞的季节&#xff0c;但对于我们的呼吸道来说&#xff0c;这又是个高危时期。伴随气温的不断上升&#xff0c;各种细菌、病毒开始活跃起来&#xff0c;同时&#xff0c;春季也是花粉过敏的高发期。无论是甲醛、细…

因子分析全流程汇总

探索性因子分析&#xff08;以下简称因子分析&#xff09;&#xff0c;是毕业论文中常用的数据分析方法&#xff0c;用于研究多个变量之间的关系&#xff0c;并通过提取公共因子来简化数据集。 信息浓缩是因子分析最常见的应用&#xff0c;除此之外&#xff0c;因子分析还可以…

2.3 同步与互斥

1 2 3 4 5 6 7 8 9 10 11 12

【InternLM 实战营第二期笔记】书生·浦语大模型全链路开源体系及InternLM2技术报告笔记

大模型 大模型成为发展通用人工智能的重要途径 专用模型&#xff1a;针对特定任务&#xff0c;一个模型解决一个问题 通用大模型&#xff1a;一个模型应对多种任务、多种模态 书生浦语大模型开源历程 2023.6.7&#xff1a;InternLM千亿参数语言大模型发布 2023.7.6&#…

【ML】类神经网络训练不起来怎么办 5

【ML】类神经网络训练不起来怎么办 5 1. Saddle Point V.S. Local Minima(局部最小值 与 鞍点)2. Tips for training: Batch and Momentum(批次与 动量)2.1 Tips for training: Batch and Momentum2.2 参考文献:2.3 Gradient Descent2.4 Concluding Remarks(前面三讲)3.…

2024年AI威胁场景报告:揭示现今最大的AI安全挑战

AI正彻底改变每一个数据驱动的机会&#xff0c;有可能带来一个繁荣的新时代&#xff0c;让人类的生活质量达到难以想象的高度。但就像任何突破性的新技术一样&#xff0c;伟大的潜力往往蕴含着巨大的风险。 AI在很大程度上是有史以来部署在生产系统中的最脆弱的技术。它在代码…

寒冬继续!飞书发全员信 “适当精简团队规模”

多精彩内容在公众号。 3月26日飞书CEO谢欣发布全员信&#xff0c;宣布进行组织调整&#xff0c;同时为受到影响的“同学”提供补偿方案和转岗机会。 在致员工的一封信中&#xff0c;谢欣坦诚地指出&#xff0c;尽管飞书的团队人数众多&#xff0c;但组织结构的不够紧凑导致了工…

使用HarmonyOS实现图片编辑,裁剪、旋转、亮度、透明度

介绍 本篇Codelab是基于ArkTS的声明式开发范式的样例&#xff0c;主要介绍了图片编辑实现过程。样例主要包含以下功能&#xff1a; 图片的解码。使用PixelMap进行图片编辑&#xff0c;如裁剪、旋转、亮度、透明度、饱和度等。图片的编码。 相关概念 图片解码&#xff1a;读取…

经典机器学习模型(九)EM算法的推导

经典机器学习模型(九)EM算法的推导 1 相关数据基础 1.1 数学期望 1.1.1 数学期望的定义 根据定义&#xff0c;我们可以求得掷骰子对应的期望&#xff1a; E ( X ) X 1 ∗ p ( X 1 ) X 2 ∗ p ( X 2 ) . . . X 6 ∗ p ( X 6 ) 1 ∗ 1 6 2 ∗ 1 6 1 ∗ 1 6 3 ∗ 1 6 …

【考研数学】跟武忠祥,如何搭配汤家凤《1800》?

可以但不建议&#xff01;正所谓原汤化原食&#xff0c;你做1800&#xff0c;当然是听汤神的更合适&#xff01; 汤家凤与武忠祥的讲课风格真的大不相同&#xff01;汤老师特别注重基础和题量&#xff0c;让你在数理思维上打下扎实的根基。而武老师则更偏向于深厚的理论&#…

天地图如何获取多边形面积

目录 一、初始化地图 二、创建polygonTool 三、多边形获取面积 ​四、完整代码&#xff08;包括添加点、添加面、编辑面、获取面积&#xff09; 项目中提出在地图上绘制面并获取面积&#xff0c;如何实现&#xff1f; 在天地图官网的JavaScript API 中&#xff0c;链接如下…

午马传动已确定加入2024第13届生物发酵展

参展企业介绍 浙江午马传动有限公司&#xff0c;办公室地址位于中国长寿之乡、中国椪柑之乡、中国竹炭之乡丽水&#xff0c;浙江省丽水市青田县东源镇项村村前路99号四楼1号&#xff0c;我公司主要提供&#xff1a;齿轮及齿轮减、变速箱制造&#xff1b;机械设备销售&#xff1…

MySQL 8 索引原理详细分析

千山万水总是情, 问问索引行不行? 轻舟已过万重山, 有种尽管来发难。 索引是在数据库优化时的重要手段之一,今天 V 哥从索引的角度展开讲一讲索引的各个要点,希望可以通过这篇文章,帮助大家彻底搞透索引的关键点。 1.索引的定义与作用2.索引的类型3.索引原理4.二分查…