Spring Boot WebFlux 中 WebSocket 生命周期解析

Spring Boot WebFlux 中的 WebSocket 提供了一种高效、异步的方式来处理客户端与服务器之间的双向通信。WebSocket 连接的生命周期包括连接建立、消息传输、连接关闭以及资源清理等过程。此外,为了确保 WebSocket 连接的稳定性和可靠性,我们可以加入重试机制,以处理断开或网络问题时自动重新连接。

1. WebSocket 连接建立

WebSocket 的连接是通过 HTTP 的 Upgrade 机制从普通的 HTTP/HTTPS 请求升级而来的。具体流程如下:

1.1 客户端请求 WebSocket 连接

客户端通过 ws://wss:// 协议来访问 WebSocket 服务器,并发送 HTTP Upgrade 请求头,要求服务器将连接升级为 WebSocket 协议:

GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: random-generated-key
Sec-WebSocket-Version: 13

1.2 服务器端处理 WebSocket 连接

Spring WebFlux 通过 WebSocketHandler 来处理 WebSocket 请求。以下是一个简单的 WebSocketHandler 实现:

@Component
public class MyWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.receive()
            .doOnNext(message -> System.out.println("Received: " + message.getPayloadAsText()))
            .then();
    }
}

当服务器收到 HTTP Upgrade 请求后,它会检查 Sec-WebSocket-Key 并返回 Sec-WebSocket-Accept 进行握手,建立连接。

1.3 握手成功,连接建立

如果握手成功,服务器会返回 101 Switching Protocols 响应,表示 WebSocket 连接已建立:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: (calculated key)

2. WebSocket 消息处理

连接建立后,WebSocket 进入消息传输阶段,包括消息的接收和发送。

2.1 消息接收

服务器端可以通过 WebSocketSession.receive() 方法来接收客户端发送的消息:

session.receive()
    .map(WebSocketMessage::getPayloadAsText)
    .doOnNext(msg -> System.out.println("Received: " + msg))
    .then();

session.receive() 返回一个 Flux<WebSocketMessage>,可以处理流式消息,每次接收到新消息时执行 doOnNext() 中的处理逻辑。

2.2 消息发送

服务器端可以通过 WebSocketSession.send() 方法发送消息给客户端:

Flux<String> messages = Flux.interval(Duration.ofSeconds(1))
    .map(i -> "Message " + i);
return session.send(messages.map(session::textMessage));

send() 方法接收一个 Publisher<WebSocketMessage>,可以使用 Flux 来生成消息流。textMessage() 方法用于创建文本消息。

3. WebSocket 连接关闭

WebSocket 连接可以由客户端、服务器或网络异常等原因主动关闭。连接关闭的主要方式如下:

3.1 正常关闭

  • 客户端主动关闭:客户端可以通过调用 WebSocket.close() 发送 Close Frame,服务器接收到后会关闭连接。
  • 服务器主动关闭:服务器通过 WebSocketSession.close() 关闭连接:
    session.close(CloseStatus.NORMAL);
    

3.2 异常关闭

  • 网络异常:如网络断开或客户端崩溃等,连接会被强制关闭。
  • 心跳超时:如果使用 ping/pong 机制检测 WebSocket 是否存活,超时未收到 pong 响应时,连接会关闭。
    session.send(Flux.just(session.pingMessage(ByteBuffer.wrap(new byte[0]))));
    

3.3 连接关闭后的处理

服务器可以使用 session.receive().doOnTerminate() 监听连接关闭事件,执行清理操作:

session.receive()
    .doOnTerminate(() -> System.out.println("WebSocket connection closed"))
    .then();

4. WebSocket 生命周期总结

阶段说明
连接建立客户端发起 WebSocket 连接请求,服务器接受并返回 101 Switching Protocols 响应,连接建立。
消息传输服务器和客户端可以双向传输文本或二进制消息。
连接关闭连接可由客户端、服务器、网络异常等原因关闭。
资源清理连接关闭后需要进行资源清理操作,如取消订阅、清理状态等。

5. 完整示例:WebFlux WebSocket 服务器

以下是一个完整的 WebSocket 服务器配置示例,展示了如何在 Spring Boot WebFlux 中配置 WebSocket:

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerMapping;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Map;

@Configuration
public class WebSocketConfig {

    @Bean
    public WebSocketHandler webSocketHandler() {
        return session -> {
            Flux<String> output = Flux.interval(Duration.ofSeconds(1))
                                     .map(time -> "Server time: " + time);
            return session.send(output.map(session::textMessage));
        };
    }

    @Bean
    public WebSocketHandlerMapping handlerMapping(WebSocketHandler handler) {
        return new WebSocketHandlerMapping(Map.of("/ws", handler));
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

说明:

  • WebSocketHandler 处理 WebSocket 连接,发送定时消息。
  • WebSocketHandlerMapping/ws 端点映射到 WebSocketHandler。
  • WebSocketHandlerAdapter 用于适配 WebSocket 处理器。

6. 服务器端发起 WebSocket 连接

如果你希望服务器主动连接到其他 WebSocket 服务器,可以使用 WebSocketClient。Spring WebFlux 提供了 ReactorNettyWebSocketClient 来发起 WebSocket 连接。

6.1 示例:服务器端发起 WebSocket 连接

import org.springframework.stereotype.Service;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import reactor.core.publisher.Mono;
import java.net.URI;

@Service
public class WebSocketClientService {

    private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();

    public Mono<Void> connectToWebSocketServer() {
        return client.execute(URI.create("ws://example.com/socket"), session -> {
            Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));

            session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(System.out::println)
                .subscribe();

            return sendMessage;
        });
    }
}

6.2 在 Spring Boot 启动时自动连接

通过在 @PostConstruct 中调用连接方法,可以确保 WebSocket 客户端在 Spring Boot 启动时自动连接:

import jakarta.annotation.PostConstruct;
import org.springframework.stereotype.Component;

@Component
public class WebSocketClientInitializer {

    private final WebSocketClientService webSocketClientService;

    public WebSocketClientInitializer(WebSocketClientService webSocketClientService) {
        this.webSocketClientService = webSocketClientService;
    }

    @PostConstruct
    public void init() {
        webSocketClientService.connectToWebSocketServer()
            .subscribe();
    }
}

7. WebSocket 连接重试机制

在 WebSocket 的生命周期中,由于网络问题或服务器错误,WebSocket 连接可能会中断。为了提高 WebSocket 连接的可靠性,我们可以为 WebSocket 客户端添加重试机制,以确保断开后能够重新连接。

7.1 使用 retry() 方法重试连接

WebFlux 提供了 retry() 方法来自动重试操作。以下是一个简单的重试机制示例:

import reactor.core.publisher.Mono;

public class WebSocketClientService {

    private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();

    public Mono<Void> connectToWebSocketServer() {
        return client.execute(URI.create("ws://example.com/socket"), session -> {
            Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));

            session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(System.out::println)
                .subscribe();

            return sendMessage;
        }).retry(5);  // 最大重试5次
    }
}

在这个例子中,retry(5) 表示如果 WebSocket 连接失败,最多会重试 5 次。

7.2 使用 retryWhen() 实现自定义重试逻辑

我们还可以通过 retryWhen() 来实现更复杂的重试策略,例如设置重试间隔时间或根据错误类型决定是否重试:

import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.time.Duration;

public class WebSocketClientService {

    private final ReactorNettyWebSocketClient client = new ReactorNettyWebSocketClient();

    public Mono<Void> connectToWebSocketServer() {
        return client.execute(URI.create("ws://example.com/socket"), session -> {
            Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));

            session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(System.out::println)
                .subscribe();

            return sendMessage;
        }).retryWhen(errors ->
            errors.zipWith(Flux.range(1, 5), (error, count) -> count)  // 重试次数
                  .flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount)))  // 增加重试间隔
        );
    }
}

在这个例子中,retryWhen() 会根据错误进行自定义重试逻辑,设置每次重试间隔递增。

8. 连接关闭后的重试机制

为了确保连接在关闭后重新建立,我们可以监听连接关闭事件并尝试重试:

session.receive()
    .doOnTerminate(() -> {
        System.out.println("WebSocket connection closed");
        reconnect();  // 重新连接
    })
    .then();

private void reconnect() {
    connectToWebSocketServer()
        .retry(3)  // 重试3次
        .subscribe();
}

8.1 完整的客户端重试代码

public Mono<Void> connectWithRetry() {
    return client.execute(URI.create("ws://example.com/socket"), session -> {
        Mono<Void> sendMessage = session.send(Mono.just(session.textMessage("Hello Server!")));

        session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .doOnNext(System.out::println)
            .doOnTerminate(() -> reconnect())  // 连接关闭后重试
            .subscribe();

        return sendMessage;
    }).retryWhen(errors ->
        errors.zipWith(Flux.range(1, 5), (error, count) -> count)
              .flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount)))
    );
}

9. 结论

Spring Boot WebFlux 中 WebSocket 的生命周期包括:

  1. 连接建立:通过 HTTP Upgrade 握手建立 WebSocket 连接。
  2. 消息收发:服务器和客户端之间通过 receive()send() 方法进行消息交换。
  3. 连接关闭:连接可以通过正常关闭、异常关闭或主动关闭的方式结束。
  4. 资源清理:连接关闭后需要进行资源清理操作,确保系统稳定。
  5. 重试机制:通过 retry()retryWhen() 方法为 WebSocket 连接添加自动重试机制,提高连接的可靠性。

通过 WebSocket,Spring Boot WebFlux 提供了高效的异步通信方式,特别适合用于实时数据流应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/982344.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(undone) MIT6.S081 Lec14 File systems 学习笔记

url: https://mit-public-courses-cn-translatio.gitbook.io/mit6-s081/lec14-file-systems-frans Why Interesting 从一个问题开始&#xff1a;既然你每天都使用了文件系统&#xff0c;XV6的文件系统与你正在使用的文件系统有什么区别。接下来我会点名&#xff1a; 学生回答…

【C++进阶学习】第一讲——继承(下)---深入挖掘继承的奥秘

目录 1.隐藏 1.1隐藏的概念 1.2隐藏的两种方式 2.继承与友元 3、继承与静态成员 4.单继承和多继承 4.1单继承 4.2多继承 5.菱形继承 问题1&#xff1a;冗余性 问题2&#xff1a;二义性 6.虚拟继承 7.总结 1.隐藏 1.1隐藏的概念 在 C 中&#xff0c;继承是一种机制…

UI自动化:利用百度ocr识别解决图形验证码登录问题

相信大家在做自动化测试过程中都遇到过图形验证码的问题&#xff0c;最近我也是遇到了&#xff0c;网上搜了很多方法&#xff0c;最简单的方法无非就是去掉图形验证码或者设置一个万能验证码&#xff0c;但是这个都需要开发来帮忙解决&#xff0c;对于我们这种自学的人来说就不…

C/C++蓝桥杯算法真题打卡(Day1)

一、LCR 018. 验证回文串 - 力扣&#xff08;LeetCode&#xff09; 算法代码&#xff1a; class Solution { public:bool isPalindrome(string s) {int n s.size();// 处理一下s为空字符的情况if (n 0) {return true; // 修正拼写错误}// 定义左右指针遍历字符串int left …

蓝桥杯备考:动态规划路径类DP之矩阵的最小路径和

如题&#xff0c;要求左上角到右下角的最短路径&#xff0c;我们还是老样子按顺序做 step1:确定状态表示 f[i][j]表示(1,1)到(i,j)的最短距离 step2 :推导状态表达方程 step3:确定填表顺序&#xff0c;应该是从上到下&#xff0c;从左到右 step4:初始化 step5 找结果&#…

18类创新平台培育入库!长沙经开区2025年各类科技创新平台培育申报流程时间材料及申报条件

长沙经开区打算申报企业研发中心、技术创新中心、工程技术研究中心、新型研发机构、重点实验室、概念验证中心和中试平台、工程研究中心、企业技术中心、制造业创新中心、工业设计中心等创新平台的可先备案培育入库&#xff0c;2025年各类平台的认定将从培育库中优先推荐&#…

CyberDefenders----WebStrike Lab

WebStrike Lab 实验室链接 简介: 公司网络服务器上发现了一个可疑文件,在内联网中发出警报。开发团队标记了异常,怀疑存在潜在的恶意活动。为了解决这个问题,网络团队捕获了关键网络流量并准备了一个 PCAP 文件以供审查。您的任务是分析提供的 PCAP 文件以发现文件的出现…

【python】gunicorn配置

起因&#xff1a;因为cpu利用率低导致我去缩容&#xff0c;虽然缩容之后cpu利用率上升维持在60%左右&#xff0c;但是程序响应耗时增加了。 解释&#xff1a;因为cpu干这件活本身不累&#xff0c;但在干这件活的时候不能去干其他事情&#xff0c;导致并发的请求不能及时响应&am…

SSE vs WebSocket:AI 驱动的实时通信抉择

引言 近年来,基于 Transformer 的大模型推动了 AI 产业的飞速发展,同时带来了新的技术挑战: 流式传输 vs 批量返回:大模型生成的长文本若需一次性返回,会显著影响用户体验,实时推送成为必需。语音交互需求:语音助手要求毫秒级响应,而非等待用户完整输入后再返回结果。…

基于海思soc的智能产品开发(芯片sdk和linux开发关系)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 随着国产化芯片的推进&#xff0c;在soc领域&#xff0c;越来越多的项目使用国产soc芯片。这些soc芯片&#xff0c;通常来说运行的os不是linux&…

各种DCC软件使用Datasmith导入UE教程

3Dmax: 先安装插件 https://www.unrealengine.com/zh-CN/datasmith/plugins 左上角导出即可 虚幻中勾选3个插件,重启引擎 左上角选择文件导入即可 Blender导入Datasmith进UE 需要两个插件, 文章最下方链接进去下载安装即可 一样的,直接导出,然后UE导入即可 C4D 直接保存成…

VMware Fusion虚拟机Mac版安装Ubuntu系统

介绍 Ubuntu操作系统是一个基于Linux内核的桌面和服务器操作系统。它由Canonical公司开发和维护&#xff0c;是最受欢迎的Linux操作系统之一。Ubuntu操作系统以简洁、直观和易用为设计原则&#xff0c;提供了友好的图形界面&#xff0c;支持多种语言和自定义设置&#xff0c;用…

发行思考:全球热销榜的频繁变动

几点杂感&#xff1a; 1、单机游戏销量与在线人数的衰退是剧烈的&#xff0c;有明显的周期性&#xff0c;而在线游戏则稳定很多。 如去年的某明星游戏&#xff0c;最高200多万在线&#xff0c;如今在线人数是48名&#xff0c;3万多。 而近期热门的是MH&#xff0c;在线人数8…

AI赋能科研绘图与数据可视化高级应用

在科研成果竞争日益激烈的当下&#xff0c;「一图胜千言」已成为高水平SCI期刊的硬性门槛——数据显示很多情况的拒稿与图表质量直接相关。科研人员普遍面临的工具效率低、设计规范缺失、多维数据呈现难等痛点&#xff0c;因此科研绘图已成为成果撰写中的至关重要的一个环节&am…

thingsboard edge 在windows 环境下的配置

按照官方文档&#xff1a;Installing ThingsBoard Edge on Windows | ThingsBoard Edge&#xff0c;配置好java环境和PostgreSQL。 下载对应的windows 环境下的tb-edge安装包。下载附件 接下来操作具体如下 步骤1&#xff0c;需要先在thingsboard 服务上开启edge 权限 步骤2…

最硬核DNS详解

1、是什么 DNS&#xff08;域名系统&#xff09;是互联网的一项服务&#xff0c;它作为将域名和IP地址相互映射的一个分布式数据库&#xff0c;能够使人更方便地访问互联网。DNS协议基于UDP协议&#xff0c;使用端口号53。 2、域名服务器类型 域名服务器在DNS体系中扮演着不…

CentOS 7 安装Nginx-1.26.3

无论安装啥工具、首先认准了就是官网。Nginx Nginx官网下载安装包 Windows下载&#xff1a; http://nginx.org/download/nginx-1.26.3.zipLinxu下载 wget http://nginx.org/download/nginx-1.26.3.tar.gzLinux安装Nginx-1.26.3 安装之前先安装Nginx依赖包、自行选择 yum -y i…

基于国产芯片的AI引擎技术,打造更安全的算力生态 | 京东零售技术实践

近年来&#xff0c;随着国产AI芯片的日益崛起&#xff0c;基于国产AI芯片的模型适配、性能优化以及应用落地是国产AI应用的一道重要关卡。如何在复杂的京东零售业务场景下更好地使用国产AI芯片&#xff0c;并保障算力安全&#xff0c;是目前亟需解决的问题。对此&#xff0c;京…

osg官方例子

osg3.6.5官方例子 osganimate osganimationeasemotion osganimationmakepath osganimationmorph

分享react后台管理系统常见的组件/知识点

前言 虽然各个前端框架的常用组件库已经非常完善&#xff0c;但做具体业务时&#xff0c;一般情况下&#xff0c;我们无法直接套用组件&#xff0c;需要自己进行撰写对应业务逻辑。 这篇文章总结做react表单列表常见的组件/知识点。 注意&#xff1a;本篇仅提供相关功能的核心…