Spring Boot与RSocket实现高效实时数据通信

作者介绍:✌️大厂全栈码农|毕设实战开发,专注于大学生项目实战开发、讲解和毕业答疑辅导。

 推荐订阅精彩专栏 👇🏻 避免错过下次更新

Springboot项目精选实战案例

更多项目:CSDN主页YAML墨韵

学如逆水行舟,不进则退。学习如赶路,不能慢一步。

目录

 推荐文章

1. 概述

2. 协议简介

Connecting

发出请求(Making Requests)

消息格式

3. 应用场景

4. 实战案例

依赖配置

客户端消息处理模板类

4.1 Request-Response模式

服务端

客户端

消息发送

运行结果

4.2 Request-Stream模式

服务端

客户端

运行结果

4.3 Channel

服务端

客户端

运行结果

4.4 Fire-and-Forget

服务端

客户端

运行结果


 推荐文章

使用RabbitMQ消息队列和Redis缓存优化Spring Boot秒杀功能

Spring Boot + 支付宝支付:一站式集成指南

Spring Boot整合Elasticsearch

Spring Boot与RabbitMQ整合:实现高可用消息队列服务

Spring Boot携手OAuth2.0,轻松实现微信扫码登录!

快速上手Spring Boot与Mybatis Plus集成

《布隆过滤器:原理、应用与使用方法深度解析》

《深度解析:Redis缓存穿透、击穿与雪崩的区别及应对策略》

Spring框架九大核心功能全面揭秘(一)

权威解析Spring框架九大核心功能(续篇):专业深度,不容错过

Spring框架九大核心功能全面解读(三):探寻功能之巅

揭秘Spring Boot中@Transactional注解失效的七大坑点与修复之道

RabbitMQ Spring Boot 配置与使用指南

Spring Boot集成RabbitMQ实现消息队列生产者与消费者


1. 概述

RSocket 是一种二进制协议,可用于 TCP、WebSockets 和 Aeron 等字节流传输的应用协议,具有以下交互模型:

  • Request-Response:    发送一条信息,接收一条信息。

  • Request-Stream:       发送一条消息并接收返回的消息流。

  • Channel:                双向发送消息流。

  • Fire-and-Forget:     发送单向消息。

一旦建立了初始连接,“客户端”与“服务器”之间的区别就消失了,因为两端变得对称,每一端都可以发起上述交互之一。这就是为什么在协议中称参与端的为“请求者”和“响应者”,而上述交互称为“请求流”或简单地称为“请求”。

RSocket 协议的主要特点和优势:

  • 跨网络边界的响应式流语义——对于流请求,如请求流和通道,背压信号在请求者和响应者之间传递,允许请求者在源端减慢响应者的速度,从而减少对网络层拥塞控制的依赖,以及在网络层或任何层缓冲的需要。

  • 请求节流(Request throttling)——这个特性被命名为“租赁”(LEASE),以从两端发送的租赁帧命名,用于限制给定时间内另一端允许的请求总数。租约是定期更新的。

  • 会话恢复——这是为连接丢失而设计的,需要维护一些状态。状态管理对应用程序是透明的,并与背压相结合,可以在可能的情况下停止生产者并减少所需的状态量。

  • 大消息的碎片化和重组。

  • 保活(心跳)。

在Java中基于 Project Reactor 和用于传输的 Reactor Netty 构建。

2. 协议简介

Connecting

最初,客户端通过 TCP 或 WebSocket 等低级流传输方式连接服务器,并向服务器发送 SETUP 帧,以设置连接参数。服务器可能会拒绝 SETUP 帧,但一般情况下,在发送(客户端)和接收(服务器)SETUP 帧后,双方都可以开始发出请求,除非 SETUP 表示使用租用语义来限制请求数量,在这种情况下,双方都必须等待另一端发出 LEASE 帧才能允许发出请求。

发出请求(Making Requests)

建立连接后,双方可通过 REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL 或 REQUEST_FNF 帧之一发起请求。每个帧都会从请求者向响应者发送一条信息。然后,响应者可以返回带有响应信息的 PAYLOAD 帧,而在 REQUEST_CHANNEL 的情况下,请求者也可以发送带有更多请求信息的 PAYLOAD 帧。

当一个请求涉及到诸如request - stream和Channel这样的消息流时,响应者必须遵守来自请求者的需求信号。需求表示为若干消息。初始需求在REQUEST_STREAM和REQUEST_CHANNEL帧中指定。后续的请求通过REQUEST_N帧发出。

每一端也可以通过METADATA_PUSH帧发送元数据通知,这些元数据通知不属于任何单独的请求,而是属于整个连接。

消息格式

RSocket 消息包含数据和元数据。元数据可用于发送路由、安全令牌等。数据和元数据可以有不同的格式。每种格式的 MIME 类型都在设置框架中声明,并适用于给定连接上的所有请求。

虽然所有报文都可以包含元数据,但路由等元数据通常是按请求提供的,因此只包含在请求的第一条报文中,即 REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL 或 REQUEST_FNF 框架之一。

3. 应用场景

  1. 游戏开发:游戏需要实时、低延迟的通信,以提供更好的用户体验。RSocket提供了一种可靠、高效的通信方式,可以在不同的游戏组件之间进行通信,例如服务器和客户端之间的通信,或者客户端与客户端之间的通信。

  2. 实时流媒体:实时流媒体应用需要将大量的数据从服务器传输到客户端,并且需要保证数据的实时性和稳定性。RSocket提供了一种可靠的数据传输方式,可以保证数据的有序性和完整性。

  3. 物联网(IoT):物联网设备需要通过网络进行通信,以实现设备的远程控制和数据采集。RSocket可以用于建立可靠、高效的连接,使得设备可以快速地传输和接收数据。

  4. 分布式系统:分布式系统需要将不同的组件连接在一起,以实现协同工作。RSocket提供了一种可靠、高效的通信方式,可以用于在不同的组件之间进行通信。

  5. 实时数据分析:实时数据分析需要对大量的数据进行实时处理和分析。RSocket可以用于将数据从数据源传输到处理节点,并保证数据的实时性和完整性。

4. 实战案例

案例会分别演示RSocket的4种交换模式。

依赖配置

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

新建两个项目,如下:

服务端配置

spring:
  rsocket:
    server:
      port: 9898
      transport: tcp

客户端消息处理模板类

@Service
public class PackRSocketService {
  private final RSocketRequester rsocketRequester;
  public PackRSocketService(RSocketRequester.Builder rsocketRequesterBuilder) {
    this.rsocketRequester = rsocketRequesterBuilder.tcp("localhost", 9898) ;
  }
}

接下来就会基于上面的配置进行各种交互模式的演示。

4.1 Request-Response模式

服务端
@MessageMapping("message")
public Mono<String> handleMessage(Mono<String> message) {
    return message.doOnNext(msg -> {
      System.out.printf("接收到消息:%s%n", msg) ;
    }).map(msg -> "服务器成功收到了你的消息!!!") ;
}
客户端
// Request-Response 发送一条信息,接收一条信息。
public void sendMessage(String body) {
   this.rsocketRequester
     .route("message")
     .data(body)
     .retrieveMono(String.class)
     .subscribe(System.out::println) ;
}
消息发送
@GetMapping("/message")
public Object message() {
  pss.sendMessage(String.valueOf(System.nanoTime())) ;
  return "message" ;
}
运行结果

4.2 Request-Stream模式

服务端
// 必须返回Flux
@MessageMapping("stream")
public Flux<String> handleStream() {
  return Flux
      .interval(Duration.ofSeconds(2))
      // 随机生成
      .map(i -> String.valueOf(new Random().nextInt(10000000)))
      // 只在此通道中获取10个值
      .take(10)
      .doOnComplete(() -> {
        System.out.println("completed...") ;
      }) ;
}
客户端
// Request-Stream 发送一条消息并接收返回的消息流。
public void sendStream() {
  this.rsocketRequester
    .route("stream")
    .retrieveFlux(String.class)
    .subscribe(ret -> {
      System.out.printf("%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
    }) ;
}
运行结果

4.3 Channel

服务端
@MessageMapping("channel")
public Flux<String> handleChannel(Flux<String> datas) {
  return datas.doOnNext(ret -> {
    System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
  }).map(ret -> {
    return ret + " - " + new Random().nextInt(1000) ;
  }) ;
}
客户端
// Channel 双向发送消息流。
public void sendChannel() {
  this.rsocketRequester
    .route("channel")
    .data(Flux.just("1", "2", "3", "4", "5", "6").delayElements(Duration.ofSeconds(1)))
    .retrieveFlux(String.class)
    .subscribe(ret -> {
      System.out.printf("【client】%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;
    }) ;
}
运行结果

4.4 Fire-and-Forget

服务端
@MessageMapping("faf")
public Mono<Void> handleFireAndForget(Mono<String> data) {
  return data.doOnNext(ret -> {
    System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;
  }).then() ;
}
客户端
// Fire-and-Forget 发送单向消息。
public void sendFireAndForget() {
  this.rsocketRequester
    .route("faf")
    .data(Mono.just(String.valueOf(new Random().nextInt(1000))))
    .send()
    .subscribe() ;
}
运行结果

总结:选择RSocket还是HTTP作为微服务间通信方式需要根据具体的业务场景和需求进行综合考虑。如果对性能、低延迟和双向通信有较高要求,RSocket可能更适合;如果对成熟度、简单性和跨平台性,HTTP可能更适合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/614161.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode-2960. 统计已测试设备【数组 模拟】

LeetCode-2960. 统计已测试设备【数组 模拟】 题目描述&#xff1a;解题思路一&#xff1a;模拟解题思路二&#xff1a; 一次遍历&#xff0c;简洁写法解题思路三&#xff1a;0 题目描述&#xff1a; 给你一个长度为 n 、下标从 0 开始的整数数组 batteryPercentages &#xf…

这套英文可视化界面,真的在UI设计上给我很多启发。

设计师在追求高颜值的英文可视化UI界面时&#xff0c;可以从以下几个方面获取启发&#xff1a; 1. 布局与排版&#xff1a; 观察一些知名的英文可视化UI界面&#xff0c;可以启发设计师对于页面布局和文本排版的设计。例如&#xff0c;可以关注页面元素的对齐方式、间距设置、…

2024高安全个人密码本程序源码,贴身密码管家-随机密码备忘录二代密码

项目概述&#xff1a; 在这个网络高度发展的时代&#xff0c;每个人都需要上网&#xff0c;而上网就不可避免地需要使用账号和密码。 在众多账号的情况下&#xff0c;你是否还在为复杂难记的密码感到烦恼&#xff1f;现在只需要记录一次&#xff0c; 就可以随时查看你的密码…

前端笔记-day02

文章目录 01-无序列表02-有序列表03-定义列表04-表格06-表格-合并单元格07-表单-input08-表单-input占位文本09-表单-单选框10-表单-上传多个文件11-表单-多选框12-表单-下拉菜单13-表单-文本域14-表单-label标签15-表单-按钮16-无语义-span和div17-字体实体19-注册登录页面 01…

分布式与一致性协议之一致哈希算法(二)

一致哈希算法 使用哈希算法有什么问题 通过哈希算法&#xff0c;每个key都可以寻址到对应的服务器&#xff0c;比如&#xff0c;查询key是key-01,计算公式为hash(key-01)%3,警告过计算寻址到了编号为1的服务器节点A&#xff0c;如图所示。 但如果服务器数量发生变化&#x…

嵌入式人工智能是一个怎样的概念呢?

嵌入式人工智能将会是未来几年人工智能发展的主要方向之一&#xff0c;并且会伴随着一系列的职位和角色的出现。虽然目前还没有嵌入式人工智能的确切定义&#xff0c;但随着人工智能的不断发展&#xff0c;它势必会延伸到边缘、终端和嵌入式市场。 嵌入式人工智能具有速度快、功…

NSSCTF | [SWPUCTF 2021 新生赛]jicao

打开题目&#xff0c;发现高亮显示了一个 php 脚本 这是脚本的内容 <?php highlight_file(index.php); include("flag.php"); $id$_POST[id]; $jsonjson_decode($_GET[json],true); if ($id"wllmNB"&&$json[x]"wllm") {echo $flag;…

共享旅游卡免费旅游真实反馈,有图有真相?

新伙伴体验&#xff0c;云南昆大丽6天5晚品质双人游&#xff0c;真实反馈&#xff01;珠海伙伴蔡总&#xff0c;加入千益畅行共享旅游卡团队&#xff0c;自己亲自体验“云南昆大丽6天5晚品质双人游”真实反馈&#xff0c;分享全程内容截图&#xff0c;无半点虚假&#xff01; …

uniapp——点赞、取消点赞

案例 更新点赞状态&#xff0c;而不是每次都刷新整个列表。避免页面闪烁&#xff0c;提升用户体验 代码 <view class"funcBtn zan" click"onZan(index,item.id)"><image src"/static/images/circle/zan.png" mode"aspectFill&…

C# WinForm —— 15 DateTimePicker 介绍

1. 简介 2. 常用属性 属性解释(Name)控件ID&#xff0c;在代码里引用的时候会用到,一般以 dtp 开头Format设置显示时间的格式&#xff0c;包含Long&#xff1a; Short&#xff1a; Time&#xff1a; Custom&#xff1a;采用标准的时间格式 还是 自定义的格式CustomFormat自定…

干货教程【AI篇】| 目前全球最强AI换脸工具swapface详细图文教程及整合包下载

需要这个工具整合包的小伙伴可以关注一下文章底部公众号&#xff0c;回复关键词【swapface】即可获取。 从我们的链接下载&#xff0c;得到这个exe文件 双击运行即可进入安装界面 如下图所示已经在安装中啦 安装好之后我们根据上面的安装路径找到要执行的文件 双击红框中的…

【VTKExamples::Rendering】第五期 环形阵列Rotations

很高兴在雪易的CSDN遇见你 VTK技术爱好者 QQ:870202403 公众号:VTK忠粉 前言 本文分享VTK样例环形阵列Rotations,希望对各位小伙伴有所帮助! 感谢各位小伙伴的点赞+关注,小易会继续努力分享,一起进步! 你的点赞就是我的动力(^U^)ノ~YO 1. Rotations

OIDC 与 OAuth2.0学习

OpenID Connect (OIDC) 和 OAuth 2.0 是两种不同的协议&#xff0c;它们通常一起使用&#xff0c;但服务于不同的目的。下面是它们的 主要区别和联系&#xff1a; OAuth 2.0 OAuth 2.0 是一个授权框架&#xff0c;它允许第三方应用代表用户获取对服务器资源的有限访问权限。…

PHP高级教程

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;PHP &#x1f4da;参考教程&#xff1a;菜鸟\编程网❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、PHP 多维数组 二、PHP date&#xff08;&#…

进程间通信(一)

IPC 在之前我们也有涉及到进程间通信的知识点&#xff0c;比如fork或exec或父进程读取子进程的退出码等&#xff0c;但是这种通信方式很有限&#xff0c;今天来学习进程间通信的其他技术——IPC&#xff08;InterProcess Communication&#xff09;。 IPC的方式通常有管道&…

【比邻智选】MF871U模组

&#x1f680;搭载国产芯&#xff0c;严苛测试&#xff0c;稳定可靠 &#x1f6e0;️R16特性加持&#xff0c;5G LAN&#xff0c;纳秒级精度 &#x1f310;超低成本&#xff0c;丰富协议&#xff0c;连接无界限

linux安装配置Docker保姆级教程

Docker到底是什么? Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中&#xff0c;然后发布到任何流行的 Linux或Windows操作系统的机器上&#xff0c;也可以实现虚拟化。 容器是完全使用沙箱机制&#xff0c;相互之间…

[AutoSar]lauterbach_001_ORTI_CPUload_Trace

目录 关键词平台说明一、ORTI概述二、ORTI文件的生成三、ORTI文件的导入四、Trace 功能4.1 Trace 功能菜单介绍4.2 Trace功能的配置4.3 Trace MCDS 设置4.4 Task Switches断点的设置4.5 Trace 数据的录取4.6 CPU 负载和Task调度的查看 关键词 嵌入式、C语言、autosar、OS、BSW…

智慧公厕建设,打造智慧城市基础设施新亮点

公共厕所是城市基础设施的重要组成部分&#xff0c;而智慧公厕的建设则是现代城市管理的创新之举。为了实现公厕的精细化管理和提供更便捷的服务&#xff0c;推进智慧公厕建设必须要实现技术融合、业务融合、数据融合的目标&#xff0c;跨越层级、地域、系统、部门和业务的限制…

LabVIEW的MEMS电容式压力传感器测试系统

LabVIEW的MEMS电容式压力传感器测试系统 针对传统微惯性测量单元(MIMU)标定方法存在的过程繁琐、标定周期长及设备复杂等问题&#xff0c;提出了一种基于LabVIEW软件的MIMU误差参数快速标定方法。通过软件上位机控制小型三轴转台&#xff0c;配合卡尔曼滤波器技术&#xff0c;…