SSE(Server Sent Event)实战(3)- Spring Web Flux 实现

上篇博客 SSE(Server Sent Event)实战(2)- Spring MVC 实现,我们用 Spring MVC 实现了简单的消息推送,并且留下了两个问题,这篇博客,我们用 Spring Web Flux 实现,并且看看这两个问题怎么解决。

一、服务端实现

/*
 * XingPan.com
 * Copyright (C) 2021-2024 All Rights Reserved.
 */
package com.sse.demo2.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author liuyuan
 * @version SseController.java, v 0.1 2024-07-15 14:24
 */
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {

    private static final HttpClient HTTP_CLIENT = HttpClient.create().responseTimeout(Duration.ofSeconds(5));

    private final Map<String, FluxSink<String>> USER_CONNECTIONS = new ConcurrentHashMap<>();

    /**
     * 用来存储用户和本机地址,实际生成请用 redis
     */
    private final Map<String, String> USER_CLIENT = new ConcurrentHashMap<>();

    /**
     * 创建连接
     */
    @GetMapping(value = "/create-connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> createConnect(@RequestParam("userId") String userId) {

        // 获取本机地址
        String hostAddress = this.getHostAddress();

        Flux<String> businessData = Flux.create(sink -> {

            USER_CONNECTIONS.put(userId, sink);
            USER_CLIENT.put(userId, hostAddress);
            log.info("创建了用户[{}]的SSE连接", userId);

            sink.onDispose(() -> {
                USER_CONNECTIONS.remove(userId);
                USER_CLIENT.remove(userId);
                log.info("移除用户[{}]的SSE连接", userId);
            });
        });

        // 创建心跳
        Flux<String> heartbeat = Flux.interval(Duration.ofMinutes(1)).map(tick -> "data: heartbeat\n\n");

        return Flux.merge(businessData, heartbeat);
    }

    /**
     * 发送消息 gateway
     */
    @GetMapping("/send-message-gateway")
    public Mono<RpcResult<Boolean>> sendMessageGateway(@RequestParam("userId") String userId, @RequestParam("message") String message) {

        String userHostAddress = USER_CLIENT.get(userId);
        if (userHostAddress == null) {
            log.info("用户[{}]的SSE连接不存在,无法发送消息", userId);
            return Mono.just(RpcResult.error("10001", "SSE连接不存在,无法发送消息"));
        }

        // 获取本机地址和用户连接地址比较,如果相同,直接使用localhost发消息
        String hostAddress = this.getHostAddress();
        userHostAddress = userHostAddress.equals(hostAddress) ? "localhost" : userHostAddress;
        String baseUrl = "http://" + userHostAddress + ":8080";
        log.info("发送消息 > baseUrl = {}", baseUrl);

        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(HTTP_CLIENT))
                .baseUrl(baseUrl)
                .build();

        RpcResult<Boolean> errorResult = RpcResult.error("10002", "消息发送失败");
        return webClient.get()
                .uri("/sse/send-message?userId={userId}&message={message}", userId, message)
                .exchangeToMono(clientResponse -> {
                    if (clientResponse.statusCode().is2xxSuccessful()) {
                        log.info("消息发送成功 > 用户 = {},消息内容 = {}", userId, message);
                        return Mono.just(RpcResult.success(true));
                    } else {
                        log.error("消息发送失败 > 状态码 = {},用户 = {},消息内容 = {}", clientResponse.statusCode().value(), userId, message);
                        return Mono.just(errorResult);
                    }
                }).onErrorResume(error -> {
                    log.error("消息发送失败 > 用户 = {}, 消息内容 = {}, e = ", userId, message, error);
                    return Mono.just(errorResult);
                });
    }

    /**
     * 发送消息
     */
    @GetMapping("/send-message")
    public Mono<Void> sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {

        FluxSink<String> sink = USER_CONNECTIONS.get(userId);
        if (sink != null) {
            try {
                sink.next(message);
                log.info("给用户[{}]发送消息成功: {}", userId, message);
            } catch (Exception e) {
                log.error("向用户[{}]发送消息失败,sink可能已关闭或无效", userId, e);
                USER_CONNECTIONS.remove(userId);
                USER_CLIENT.remove(userId);
            }
        } else {
            log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);
        }

        return Mono.empty();
    }

    private String getHostAddress() {

        String hostAddress = "localhost";
        try {
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                NetworkInterface networkInterface = networkInterfaces.nextElement();
                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress inetAddress = inetAddresses.nextElement();
                    if (!inetAddress.isLoopbackAddress() && !inetAddress.getHostAddress().contains(":") && inetAddress.getHostAddress().startsWith("10.")) {
                        hostAddress = inetAddress.getHostAddress();
                    }
                }
            }
        } catch (SocketException e) {
            log.error("获取主机地址失败", e);
        }

        log.info("获取主机地址 > hostAddress = {}", hostAddress);

        return hostAddress;
    }
}
  1. 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?

在创建连接时/create-connect,增加心跳,只要心跳频率小于超时时间,基本就可以解决这个问题,但是前端要注意隐藏心跳内容。

  1. 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。

a. 将用户id 和用户请求的实例 ip 绑定,我这里用的是Map(USER_CLIENT)存储,生产请换成分布式缓存;
b. 服务端发送消息使用/send-message-gateway接口,这个接口只做消息分发,不真实发送消息。从USER_CLIENT中获取用户所在的实例,然后将请求分发到具体实例;
c. /send-message-gateway将请求打到/send-message,然后给用户推送消息;

二、客户端实现


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Demo</title>
    <script>        document.addEventListener('DOMContentLoaded', function () {
        var userId = "1";

        // 创建一个新的EventSource对象
        var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);

        // 当连接打开时触发
        source.onopen = function (event) {
            console.log('SSE连接已打开');
        };

        // 当从服务器接收到消息时触发
        source.onmessage = function (event) {
            // event.data 包含服务器发送的文本数据
            console.log('接收到消息:', event.data);
            // 在页面上显示消息
            var messagesDiv = document.getElementById('messages');
            if (messagesDiv) {
                messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data
            } else {
                console.error('未找到消息容器元素');
            }
        };

        // 当发生错误时触发
        source.onerror = function (event) {
            console.error('SSE连接错误:', event);
        };
    });
    </script>
</head>
<body>
<div id="messages">
    <!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>

三、启动项目

  1. 运行 Spring 项目
  2. 浏览器打开 index.html文件
  3. 调用发送消息接口
    curl http://localhost:8080/sse/send-message-gateway?userId=1&message=test0001
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/803247.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity动画系统(3)---融合树

6.1 动画系统基础2-6_哔哩哔哩_bilibili Animator类 using System.Collections; using System.Collections.Generic; using UnityEngine; public class EthanController : MonoBehaviour { private Animator ani; private void Awake() { ani GetComponen…

【ECharts】使用 ECharts 处理不同时间节点的数据系列展示

使用 ECharts 处理不同时间节点的数据系列展示 在数据可视化中&#xff0c;我们经常遇到这样的问题&#xff1a;不同数据系列的数据点在时间轴上并不对齐。这种情况下&#xff0c;如果直接在 ECharts 中展示&#xff0c;图表可能会出现混乱或不准确。本文将通过一个示例代码&a…

解决VSCode自动识别文件编码

在VScode 的 设置界面 输入 autoGuess 关键字 &#xff0c;勾选启用即可自动识别&#xff01;&#xff01;&#xff01;

【Python与GUI开发】事件处理与打包分发

文章目录 前言 一、高级事件处理 1.自定义事件 2.拖放操作 3.复杂控件的事件处理 二、打包和分发 Tkinter 应用 1.PyInstaller 2.cx_Freeze 3.spec 文件 4.分发注意事项 三、实战示例&#xff1a;文件浏览器 总结 前言 在前面的讨论中&#xff0c;我们深入理解了 T…

Qt MV架构-委托类

一、基本概念 与MVC模式不同&#xff0c;MV视图架构中没有包含一个完全分离的组件来处理与用户的交互。 一般地&#xff0c;视图用来将模型中的数据显示给用户&#xff0c;也用来处理用户的输入。为了获得更高的灵活性&#xff0c;交互可以由委托来执行。 这些组件提供了输入…

每日一 练,java

目录 题目分析代码 题目 选自牛客网 1.小美的平衡矩阵 小美拿到了一个&#x1d45b;∗&#x1d45b;的矩阵&#xff0c;其中每个元素是 0 或者 1。 小美认为一个矩形区域是完美的&#xff0c;当且仅当该区域内 0 的数量恰好等于 1 的数量。现在&#xff0c;小美希望你回答有多…

电瓶车检测AI算法:视频智能分析技术助力电瓶车规范与安全管理

随着电瓶车&#xff08;电动自行车&#xff09;的普及&#xff0c;其在城市交通中扮演着越来越重要的角色。然而&#xff0c;电瓶车的管理、安全监控以及维护等方面也面临着诸多挑战。近年来&#xff0c;人工智能&#xff08;AI&#xff09;技术的发展为解决这些问题提供了新的…

网络开局 与 Underlay网络自动化

由于出口和核心设备 部署在核心机房,地理位置集中,业务复杂,开局通常需要网络工程师进站调测。 因此核心层及核心以上的设备(包含核心层设备,旁挂独立AC设备和出口设备)推荐采用WEB网管开局方式或命令行开局方式。 核心以下的设备(包含汇聚层设备、接入层设备和AP)由于数量众…

使用 exe4j 转换 Java jar 程序为 Windows 平台可执行文件 (.exe)

使用 exe4j 转换 Java jar 程序为 Windows 平台可执行文件 &#xff08;.exe&#xff09; 介绍exe4j 特点&#xff1a;转换全过程&#xff08;软件操作&#xff09;1、注册2、选择模式3、配置应用4、选择执行的方式&#xff08;我这里管这个叫呈现方式&#xff09;5、选择 JAR …

6.Dockerfile及Dockerfile常用指令

Dockerfile是构建docker镜像的脚本文件 Dockerfile有很多的指令构成&#xff0c;指令由上到下依次运行。 每一条指令就是一层镜像&#xff0c;层越多&#xff0c;体积就越大&#xff0c;启动速度也越慢 井号开头的行是注释行。指令写大写写小写都行&#xff0c;但一般都写为…

Java SpringAOP简介

简介 官方介绍&#xff1a; SpringAOP的全称是&#xff08;Aspect Oriented Programming&#xff09;中文翻译过来是面向切面编程&#xff0c;AOP是OOP的延续&#xff0c;是软件开发中的一个热点&#xff0c;也是Spring框架中的一个重要内容&#xff0c;是函数式编程的一种衍生…

WEB前端05-JavaScrip基本对象

JavaScript对象 1.Function对象 函数的创建 //方法一&#xff1a;自定义函数 function 函数名([参数]) {函数体[return 表达式] }//方法二&#xff1a;匿名函数 (function([参数]) {函数体[return 表达式] }); **使用场景一&#xff1a;定义后直接调用使用(只使用一次) (fun…

《学会 SpringBoot · 定制 SpringMVC》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; 近期刚转战 CSDN&#xff0c;会严格把控文章质量&#xff0c;绝不滥竽充数&#xff0c;如需交流&#xff…

【持续集成_06课_Jenkins高级pipeline应用】

一、创建项目选择pipeline的风格 它主要是以脚本&#xff08;它自己的语言&#xff09;的方式进行运行&#xff0c;一般由运维去做的事情&#xff0c;作为测试而言。了解即可。 --- 体现形式全部通过脚本去实现&#xff1a;执行之前&#xff08;拉取代码&#xff09;执行&…

【JavaEE精炼宝库】 初识网络原理——网络通信基础 | 协议

文章目录 一、网络发展史1.1 独立模式&#xff1a;1.2 网络互连&#xff1a;1.3 局域网&#xff08;LAN&#xff09;&#xff1a;1.4 广域网&#xff08;WAN&#xff09;&#xff1a; 二、网络通信基础2.1 IP地址&#xff1a;2.2 端口号&#xff1a; 三、协议3.1 协议的概念&am…

[米联客-安路飞龙DR1-FPSOC] FPGA基础篇连载-18 I2C MASTER控制器驱动设计

软件版本&#xff1a;Anlogic -TD5.9.1-DR1_ES1.1 操作系统&#xff1a;WIN10 64bit 硬件平台&#xff1a;适用安路(Anlogic)FPGA 实验平台&#xff1a;米联客-MLK-L1-CZ06-DR1M90G开发板 板卡获取平台&#xff1a;https://milianke.tmall.com/ 登录“米联客”FPGA社区 ht…

【5G Sub-6GHz模块】专为IoT/eMBB应用而设计的RG520NNA、RG520FEB、RG530FNA、RG500LEU 5G模组

推出全新的5G系列模组&#xff1a; RG520NNADB-M28-SGASA RG520NNADA-M20-SGASA RG520FEBDE-M28-TA0AA RG530FNAEA-M28-SGASA RG530FNAEA-M28-TA0AA RG500LEUAA-M28-TA0AA ——明佳达 1、5G RG520N 系列——专为IoT/eMBB应用而设计的LGA封装模块 RG520N 系列是一款专为 IoT…

Ghost Browser指纹浏览器集成IPXProxy代理IP:解锁Twitch直播新体验

​Twitch 是一个实时视频流平台&#xff0c;允许人们实时播放各自的内容&#xff0c;无论是游戏、娱乐、体育、音乐还是其他内容。不少人的人都想要在Twitch直播来吸引更多的粉丝&#xff0c;然而有时候会面临无法成功使用Twitch的问题。本文将带来Ghost Browser指纹浏览器集成…

网络概念: 互联网和局域网、 OSI七层网络互联模型、数据封装、应用端口、地址解析、网络设备、网络配置

文章目录 引言I 网络概念1.1 互联网和局域网1.2 OSI七层网络互联模型1.3 数据封装1.4 TCP/IP协议1.5 应用端口II 地址解析III 网络设备3.1 集线器 HUB3.2 交换机 swich3.3 路由器 router3.4 防火墙 firewallIV 网络配置4.1 网络安全域(你住哪里?)4.2 地址转换(NAT,你名字叫…

Go 1.19.4 函数-Day 08

1. 函数概念和调用原理 1.1 基本介绍 函数是基本的代码块&#xff0c;用于执行一个任务。 Go 语言最少有个 main() 函数。 你可以通过函数来划分不同功能&#xff0c;逻辑上每个函数执行的是指定的任务。 函数声明告诉了编译器函数的名称&#xff0c;返回类型&#xff0c;和参…