使用 ResponseBodyEmitter 实现异步响应式数据流处理

1. 概述

1.1 什么是 ResponseBodyEmitter

ResponseBodyEmitter 是 Spring MVC 提供的一个接口,用于支持异步返回响应数据流。它允许在控制器方法中逐步发送数据给客户端,而无需一次性生成完整的响应。

1.2 使用场景

  • 实时数据推送(如股票行情、聊天消息等)。
  • 大量数据分批传输。
  • 服务器发送事件(SSE, Server-Sent Events)。

1.3 优势与局限性

优势:

  • 支持异步数据流处理。
  • 能够实时更新客户端数据。
  • 简化了复杂数据流的管理。

局限性:

  • 高并发场景下需要额外优化。
  • 客户端断开连接时需手动处理资源释放。

2. 环境准备

2.1 添加依赖

确保项目中包含以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2.2 配置 Spring Boot 项目

创建一个标准的 Spring Boot 项目,并配置好基础环境。

3. 基本使用方法

3.1 创建控制器

定义一个控制器类,用于处理 HTTP 请求。

3.2 返回 ResponseBodyEmitter 对象

通过返回 ResponseBodyEmitter 对象实现异步数据流。

3.3 发送数据给客户端

使用 emitter.send() 方法向客户端发送数据。

示例代码:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/stream")
public class StreamController {
   

    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    @GetMapping("/events")
    public ResponseBodyEmitter handleEvents() {
   
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        // 使用线程池管理异步任务
        executorService.execute(() -> {
   
            try {
   
                for (int i = 0; i < 5; i++) {
   
                    // 模拟延迟
                    TimeUnit.SECONDS.sleep(1);
                    // 发送数据给客户端
                    emitter.send("Event " + i + "\n");
                }
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
   
                // 发生错误时处理
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}

说明:

  • 使用 ExecutorService 管理异步任务,避免直接创建线程。
  • TimeUnit.SECONDS.sleep(1) 模拟每秒发送一次数据。
  • emitter.send("Event " + i + "\n") 发送数据给客户端。
  • emitter.complete() 完成数据发送。
  • emitter.completeWithError(e) 处理异常。

4. 实现服务器发送事件(SSE)

4.1 SSE 简介

SSE 是一种基于 HTTP 的协议,允许服务器向客户端推送实时更新的数据。

4.2 使用 ResponseBodyEmitter 实现 SSE

通过设置响应头 Content-Type: text/event-stream,可以实现 SSE。

示例代码:

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RestController
@RequestMapping("/sse")
public class SseController {
   

    private final ExecutorService executorService = Executors.newFixedThreadPool(10);

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSse() {
   
        SseEmitter emitter = new SseEmitter();

        // 使用线程池管理异步任务
        executorService.execute(() -> {
   
            try {
   
                for (int i = 0; i < 5; i++) {
   
                    // 模拟延迟
                    TimeUnit.SECONDS.sleep(1);
                    // 发送数据给客户端
                    emitter.send(SseEmitter.event().name("message").data("Event " + i));
                }
                // 完成发送
                emitter.complete();
            } catch (IOException | InterruptedException e) {
   
                // 发生错误时处理
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }
}

说明:

  • 使用 SseEmitter 实现 SSE。
  • MediaType.TEXT_EVENT_STREAM_VALUE 设置响应头为 text/event-stream
  • emitter.send(SseEmitter.event().name("message").data("Event " + i)) 发送带有名称的数据。
  • emitter.complete() 完成数据发送。
  • emitter.completeWithError(e) 处理异常。

4.3 客户端代码示例

HTML 示例:

<!DOCTYPE html>
<html>
<head>
    <title>SSE Example</title>
</head>
<body>
    <div id="events"></div>
    <script>
        const eventSource = new EventSource('/sse/stream');
        eventSource.onmessage = function(event) {
     
            document.getElementById('events').innerHTML += event.data + '<br>';
        };
        eventSource.onerror = function(err) {
     
            console.error("EventSource failed:", err);
        };
    </script>
</body>
</html>

说明:

  • 使用 EventSource 连接到 SSE 流。
  • eventSource.onmessage 处理接收到的数据。
  • eventSource.onerror 处理错误。

5. 异步数据推送的最佳实践

5.1 数据流管理

  • 使用线程池管理异步任务,避免资源耗尽。
  • 设置合理的超时时间,防止连接长时间占用。

示例代码:

import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/983536.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(最新教程)Cursor Pro订阅升级开通教程,使用支付宝订阅Cursor Pro Plus

一、如何使用Cursor &#xff1f; 目前要使用Cursor - The AI Code Editor&#xff0c;直接去下载安装就可以了&#xff0c;不过基础版只能用两周&#xff0c;如果需要继续使用&#xff0c;就要订阅pro plus或者企业版了。 二、如何订阅Cursor Pro Plus &#xff1f; 因为基础…

Cursor 使用经验,一个需求开发全流程

软件开发中 Cursor 的使用经验成为关注焦点&#xff0c;尤其是处理大型数据集的需求。用户提到“Cursor 使用经验&#xff0c;一个需求开发全流程”&#xff0c;但“Cursor”可能指数据库游标&#xff0c;涉及逐行处理数据。本文将详细探讨开发一个需求的完整流程&#xff0c;包…

vue2实现组件库的自动按需引入,unplugin-auto-import,unplugin-vue-components

1.使用ant-design-vue或者element-ui时&#xff0c;如何每个组件都去import导入组件&#xff0c;大大降低了开发效率&#xff0c;如果全局一次性注册会增加项目体积&#xff0c;那么如何实现既不局部引入&#xff0c;也不全局注册&#xff1f; 2.在element-plus官网看到有说明…

蓝桥杯备赛:一道数学题(练思维(同余的应用))

题目&#xff1a;请问由1-8组成的8位数中有多少个数字可以被1111整除&#xff1f; 首先这道题目看着很难&#xff0c;如果我们直接用代码做的话&#xff0c;也要跑很久&#xff0c;那能不呢想想有什么样的思路可以巧妙一点解开这道题目呢&#xff1f; 有的兄弟有的 这道题目的…

[Lc7_分治-快排] 快速选择排序 | 数组中的第K个最大元素 | 库存管理 III

目录 1. 数组中的第K个最大元素 题解 代码 2.库存管理 III 代码 1. 数组中的第K个最大元素 题目链接&#xff1a;215. 数组中的第K个最大元素 题目分析&#xff1a; 给定整数数组 nums 和整数 k&#xff0c;请返回数组中第 k 个最大的元素。 请注意&#xff0c;你需要…

Unity引擎使用HybridCLR(华佗)热更新

大家好&#xff0c;我是阿赵。   阿赵我做手机游戏已经有十几年时间了。记得刚开始从做页游的公司转到去做手游的公司&#xff0c;在面试的时候很重要的一个点&#xff0c;就是会不会用Lua。使用Lua的原因很简单&#xff0c;就是为了热更新。   热更新游戏内容很重要。如果…

【神经网络】python实现神经网络(一)——数据集获取

一.概述 在文章【机器学习】一个例子带你了解神经网络是什么中&#xff0c;我们大致了解神经网络的正向信息传导、反向传导以及学习过程的大致流程&#xff0c;现在我们正式开始进行代码的实现&#xff0c;首先我们来实现第一步的运算过程模拟讲解&#xff1a;正向传导。本次代…

【Linux】冯诺依曼体系与操作系统理解

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;Linux 目录 前言 一、冯诺依曼体系结构 二、操作系统 1. 操作系统的概念 2. 操作系统存在的意义 3. 操作系统的管理方式 4. 补充&#xff1a;理解系统调用…

HTML-网页介绍

一、网页 1.什么是网页&#xff1a; 网站是指在因特网上根据一定的规则&#xff0c;使用 HTML 等制作的用于展示特定内容相关的网页集合。 网页是网站中的一“页”&#xff0c;通常是 HTML 格式的文件&#xff0c;它要通过浏览器来阅读。 网页是构成网站的基本元素&#xf…

STM32——GPIO介绍

GPIO(General-Purpose IO ports,通用输入/输出接口)模块是STM32的外设接口的核心部分,用于感知外界信号(输入模式)和控制外部设备(输出模式),支持多种工作模式和配置选项。 1、GPIO 基本结构 STM32F407 的每个 GPIO 引脚均可独立配置,主要特性包括: 9 组 GPIO 端口…

字节码是由什么组成的?

Java字节码是Java程序编译后的中间产物&#xff0c;它是一种二进制格式的代码&#xff0c;可以在Java虚拟机&#xff08;JVM&#xff09;上运行。理解字节码的组成有助于我们更好地理解Java程序的运行机制。 1. Java字节码是什么&#xff1f; 定义 Java字节码是Java源代码经过…

链表算法题目

1.两数相加 两个非空链表&#xff0c;分别表示两个整数&#xff0c;只不过是反着存储的&#xff0c;即先存储低位在存储高位。要求计算这两个链表所表示数的和&#xff0c;然后再以相同的表示方式将结果表示出来。如示例一&#xff1a;两个数分别是342和465&#xff0c;和为807…

blender学习25.3.8

【04-进阶篇】Blender材质及灯光Cycle渲染&后期_哔哩哔哩_bilibili 注意的问题 这一节有一个大重点就是你得打开显卡的渲染&#xff0c;否则cpu直接跑满然后渲染的还十分慢 在这里你要打开GPU计算&#xff0c;但是这还不够 左上角编辑&#xff0c;偏好设置&#xff0c;系…

【godot4.4】布局函数库Layouts

概述 为了方便编写一些自定义容器和控件、节点时方便元素布局&#xff0c;所以编写了一套布局的求取函数&#xff0c;统一放置在一个名为Layouts的静态函数库中。 本文介绍我自定义的一些布局计算和实现以及函数编写的思路&#xff0c;并提供完整的函数库代码&#xff08;持续…

Windows下配置Conda环境路径

问题描述&#xff1a; 安装好Conda之后&#xff0c;创建好自己的虚拟环境&#xff0c;同时下载并安装了Pycharm&#xff0c;但在Pycharm中找不到自己使用Conda创建好的虚拟环境。显示“Conda executable is not found” 解决办法&#xff08;依次尝试以下&#xff09; 起初怀…

OpenHarmony子系统开发编译构建指导

OpenHarmony子系统开发编译构建指导 概述 OpenHarmony编译子系统是以GN和Ninja构建为基座&#xff0c;对构建和配置粒度进行部件化抽象、对内建模块进行功能增强、对业务模块进行功能扩展的系统&#xff0c;该系统提供以下基本功能&#xff1a; 以部件为最小粒度拼装产品和独…

leetcode日记(80)复原IP地址

只能说之前动态规划做多了&#xff0c;看到就想到动态规划&#xff0c;然后想想其实完全不需要&#xff0c;回溯法就行了。 一开始用了很多莫名其妙的代码&#xff0c;写的很复杂……&#xff08;主要因为最后不能加‘.’&#xff09;其实想想只要最后加入vector时去掉最后一个…

LINUX网络基础 [五] - HTTP协议

目录 HTTP协议 预备知识 认识 URL 认识 urlencode 和 urldecode HTTP协议格式 HTTP请求协议格式 HTTP响应协议格式 HTTP的方法 HTTP的状态码 ​编辑HTTP常见Header HTTP实现代码 HttpServer.hpp HttpServer.cpp Socket.hpp log.hpp Makefile Web根目录 H…

【A2DP】SBC 编解码器互操作性要求详解

目录 一、SBC编解码器互操作性概述 二、编解码器特定信息元素(Codec Specific Information Elements) 2.1 采样频率(Sampling Frequency) 2.2 声道模式(Channel Mode) 2.3 块长度(Block Length) 2.4 子带数量(Subbands) 2.5 分配方法(Allocation Method) 2…

电脑内存智能监控清理,优化性能的实用软件

软件介绍 Memory cleaner是一款内存清理软件。功能很强&#xff0c;效果很不错。 Memory cleaner会在内存用量超出80%时&#xff0c;自动执行“裁剪进程工作集”“清理系统缓存”以及“用全部可能的方法清理内存”等操作&#xff0c;以此来优化电脑性能。 同时&#xff0c;我…