实时通讯技术实现

实时通讯技术实现

前言

在CS架构中,经常会有实时通信的需求。客户端和服务端建立连接,服务端实时推送数据给客户端。本文介绍几种常见的实现方式,希望能给读者们一点点参考。

实时通讯的主要实现技术

  • 长轮询(Long Polling)
  • WebSocket
  • 服务器发送事件(Server-Sent Events, SSE)
  • XMPP (Extensible Messaging and Presence Protocol)
  • MQTT (Message Queuing Telemetry Transport)

长轮询

长轮询(Long Polling): 一种网络通信机制,用于实现客户端和服务器之间的实时数据传输。

Http长轮询机制:

原理图
原理图

长轮询工作原理:

  1. client端请求server端,并约定好超时时间;
  2. server端收到请求后,判断数据是否有变化:
    • 有变化:立即返回数据;
    • 没变化:则阻塞http请求,并且将长轮询请求任务放入队列中,然后开启任务调度,调度任务在长连接维持时间到期后,会将长轮询请求移除队列,并返回对应数据。
  3. 如果在挂起的这段时间内,数据有变化,服务器会移除队列中的长轮询请求,并响应数据给客户端。

长轮询优缺点:

优点:
  • 兼容性好
  • 实现简单
  • 即时性
缺点:
  • 服务器hold住连接,占用资源
  • 会有延迟,服务器响应后,客户端要重新发起连接(这段时间内有新消息不能即时触达)

Java 示例代码


@Controller
public class LongPollingController {

    private final Map<String, DeferredResult<String>> deferredResults = new ConcurrentHashMap<>();

    @GetMapping("/longpolling")
    @ResponseBody
    public Object longPolling() {
        DeferredResult<String> deferredResult = new DeferredResult<>(30000L, "time out");
        deferredResults.put("key", deferredResult); // 假设每个客户端有一个唯一的key
        return deferredResult;
    }

    @GetMapping("/push")
    public void push() {
        // 模拟异步数据获取
        deferredResults.get("key").setResult("data update"); // 当数据准备好时,触发长轮询
    }
}

DeferredResult 是 Spring MVC 提供的一种用于处理异步请求的机制,它允许在处理请求时延迟产生结果,并且允许在处理请求的不同线程中生成结果。DeferredResult 可以用于异步处理 HTTP 请求,并在处理完成后返回结果给客户端。

WebSocket

WebSocket 是 HTML5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术。WebSocket 基于 TCP 双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息。

WebSocket原理图:

alt

WebSocket特点:

  1. 单一的TCP连接,采用全双工模式通信;
  2. 对代理、防火墙和路由器透明;
  3. 无头部信息、Cookie和身份验证;
  4. 无安全开销;
  5. 通过“ping/pong”帧保持链路激活;
  6. 服务器可以主动传递消息给客户端,不再需要客户端轮询。

示例代码(netty实现websocket通信)

WebSocket服务端启动类:


@Component
@Slf4j
public class WebSocketServer {

    /**
     * webSocket协议名
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号
     */
    @Value("${webSocket.netty.port:58080}")
    private int port;

    /**
     * webSocket路径
     */
    @Value("${webSocket.netty.path:/webSocket}")
    private String webSocketPath;

    @Autowired
    private WebSocketHandler webSocketHandler;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;

    /**
     * 启动
     *
     * @throws InterruptedException
     */
    private void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        workGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        // bossGroup负责客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
        bootstrap.group(bossGroup, workGroup);
        // 设置NIO类型的channel
        bootstrap.channel(NioServerSocketChannel.class);
        // 设置监听端口
        bootstrap.localAddress(new InetSocketAddress(port));
        // 连接到达时会创建一个通道
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 流水线管理通道中的处理程序(Handler),用来处理业务
                // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                ch.pipeline().addLast(new HttpServerCodec());
                ch.pipeline().addLast(new ObjectEncoder());
                // 以块的方式来写的处理器
                ch.pipeline().addLast(new ChunkedWriteHandler());
                //将收到的 HTTP 请求或响应的多个部分合并成一个完整的对象
                ch.pipeline().addLast(new HttpObjectAggregator(8192));
                /*
                说明:
                1、对应webSocket,它的数据是以帧(frame)的形式传递
                2、浏览器请求时 ws://localhost:58080/xxx 表示请求的uri
                3、核心功能是将http协议升级为ws协议,保持长连接
                */
                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
                ch.pipeline().addLast(new HeartBeatHandler());
                // 自定义的handler,处理业务逻辑
                ch.pipeline().addLast(webSocketHandler);

            }
        });
        // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
        ChannelFuture channelFuture = bootstrap.bind().sync();
        log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
        // 对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
    }

    /**
     * 释放资源
     *
     * @throws InterruptedException
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }

    @PostConstruct
    public void init() {
        //需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

业务逻辑处理器:


@Component
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 接收到 WebSocket 文本消息
        System.out.println("Received message: " + msg.text());
        // 响应 WebSocket 文本消息
        ctx.writeAndFlush(new TextWebSocketFrame("Received your message: " + msg.text()));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 异常处理
        cause.printStackTrace();
        ctx.close();
    }
}

SSE

SSE(Server-Sent Events,服务器发送事件)是一种用于实现服务器向客户端单向推送数据的技术。它允许服务器端在任何时候发送数据到客户端,而客户端不需要发起请求。SSE 基于 HTTP 协议,使用简单的文本格式进行通信,通常被用于实时更新网页内容、实时通知等场景。

SSE 的工作原理如下:

  1. 客户端向服务器发送一个 HTTP 请求,请求的头部包含 Accept: text/event-stream 表示接受 SSE 格式的响应。
  2. 服务器接收到请求后,保持连接打开,并在连接上周期性地发送消息给客户端。每个消息都以 data: 开头,并以两个换行符 \n\n 结束。
  3. 客户端接收到消息后,将其通过事件监听器处理。
SSE原理图:
alt

代码示例


@Controller
public class SSEController {

    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> sse() {
        return Flux.interval(Duration.ofSeconds(10))
                .map(sequence -> ServerSentEvent.<String>builder()
                        .id(String.valueOf(sequence))
                        .event("message")
                        .data("Hello SSE - " + LocalTime.now())
                        .build());
    }
}

Postman 响应示例:

alt

长轮询、WebSocket、SSE 对比

1. 服务器资源消耗:
  • 长轮询:服务器需要为每个客户端请求保持一个开放的连接,直到有数据发送。这导致服务器资源(如内存和连接槽)的消耗,特别是在高并发场景下。
  • WebSocket:建立后,WebSocket 提供了一个持久的、全双工的连接通道。虽然它也占用服务器资源,但由于其连接是持久的,所以不需要频繁地创建和销毁连接,相对于长轮询,这可以减少资源消耗和延迟。
  • SSE:SSE 也保持开放的连接,但只支持单向通信(服务器到客户端)。与长轮询相比,SSE 通过减少连接的建立和销毁次数来优化资源使用,但对于每个客户端,它仍然占用一个连接。
2. 网络延迟和效率:
  • 长轮询:每次请求可能在服务器有数据可发送之前保持打开状态,这可能导致网络延迟。
  • WebSocket:一旦建立,消息可以几乎无延迟地在客户端和服务器之间传输,提高了效率和实时性。
  • SSE:由于连接持续开放,SSE 可以实现低延迟的服务器到客户端消息传输,但不支持客户端到服务器的实时通信。
3. 实现复杂性:
  • 长轮询:相对简单,不需要特殊的协议支持,但服务器端需要逻辑来管理多个持续的请求。
  • WebSocket:需要在客户端和服务器端实现WebSocket协议,比长轮询实现复杂,连接的建立、错误的处理和断开连接时的重连等机制都需要考虑。
  • SSE:客户端实现相对简单,主要的复杂性在于服务器端,需要支持HTTP/1.1的持久连接。

XMPP

XMPP (Extensible Messaging and Presence Protocol) 是一个支持消息传递和状态显示的开放即时通讯协议。它实现了客户端与服务器之间的双向通信,并可以通过扩展以适应多样的即时通讯服务需求。基于 XML (Extensible Markup Language) 和 TCP/IP 协议构建,XMPP 特点包括灵活性、可扩展性和分布式架构。

XMPP设计的网络结构中定义了3类通信实体:

  • 客户端
  • 服务器
  • 网关

XMPP中基本的通信基于传统的CS模式,即客户端通过TCP/IP连接到服务器,然后通过传输XML流进行通信。

XMPP的系统原理图:

网图
网图

MQTT

MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息协议,专为低带宽和不可靠网络环境设计,广泛应用于物联网(IoT)、移动应用等场景。基于发布/订阅模型,它允许设备发布消息到主题,同时允许其他设备订阅这些主题以接收消息。MQTT运行于TCP/IP协议之上,提供了一种简单有效的方式来进行设备间的通信。

MQTT原理图:

alt

MQTT 和 XMPP 都需要额外的服务器来进行消息通信,这些服务器通常被称为 MQTT 代理(broker)和 XMPP 服务器。

总结

MQTT 和 XMPP 需要中间服务器(分别是 MQTT 代理和 XMPP 服务器)来处理消息的路由、传递和存储,增加了部署的复杂性,并需要确保中间件服务的高可用性。

相比之下,SSE(Server-Sent Events)和 WebSocket 提供了更直接的通信方式,允许服务器和客户端之间建立持久的连接。这两种技术直接基于现有的 HTTP/HTTPS 协议,可以利用现有的 Web 服务器架构进行部署,从而减少了额外的中间件需求。

在复杂性方面,MQTT 和 XMPP 要求开发者具备对相应协议的深入了解,相较于 SSE 和 WebSocket 的简单 API 来说,实现起来会相对复杂,但这些协议也提供了更丰富的灵活性。

总的来说,选择合适的技术实现取决于业务需求和现有架构,各种技术都有其适用的场景。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/492127.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

搜索(find a way, Pots)

Find a way 思路&#xff1a;这一题看似简单其实不然&#xff0c;有很多特办条件&#xff0c;如果当这个M和Y都在KFC的时候就会导致步数为0 &#xff0c;或者可以这样说&#xff1a;只有两人都能到达才能算入答案。 我们可以使用bfs来写这道题&#xff0c;这不过这道题目不需…

ChatGPT,来一份3·28雷布斯米时捷上市发布会即时发言稿

你新招了一个秘书。上班第一天&#xff0c;你对他说&#xff1a;“3月28号我可能会受邀参加雷老板的米时捷’上市发布会&#xff0c;届时我可能会有十分钟的发言机会&#xff0c;你现在准备一篇演讲稿。” 秘书问你有何指导意见&#xff1f; 你自己都不知说啥子&#xff0c;能…

探秘 RabbitMQ 的设计理念与核心技术要点

目录 一、消息中间件介绍 1.1 消息中间件的作用 二、RabbitMQ 2.1 核心概念 2.2 生产者发送消息过程 2.3 消费者接收消息过程 2.4 RabbitMQ 为何要引入信道(channel) 2.5 消费模式 一、消息中间件介绍 消息队列中间件&#xff08;message queue middleWare, MQ&#xff09;指…

前端发版上线出现白屏问题

目录 路由配置问题资源缓存问题首屏加载过慢 &#xff1a;喂&#xff0c;你的页面白啦&#xff01; 出现上线白屏的问题有很多&#xff0c;如&#xff1a;配置错误、缓存问题、浏览器兼容问题&#xff0c;根据不同情况去解决。 路由配置问题 问题描述&#xff1a; 在vue开发…

ValueError: Cannot load file containing pickled data when allow_pickle=False

问题描述 遇到报错&#xff1a;ValueError: Cannot load file containing pickled data when allow_pickleFalse 解决方案 经过查阅有人说是与numpy的版本有关&#xff0c;但是还是不要轻易改变环境中的版本&#xff0c;不一定哪个地方就会报错。这里放个解决方案&#xff1a;…

【详细讲解yarn的安装和使用】

&#x1f308;个人主页:程序员不想敲代码啊&#x1f308; &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家&#x1f3c6; &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提…

探索MongoDB:发展历程、优势与应用场景

一、MongoDB简介 MongoDB 始于 2007 年&#xff0c;由 Dwight Merriman、Eliot Horowitz 和 Kevin Ryan –&#xff08;DoubleClick 的主理团队&#xff09;共同创立。 DoubleClick 是一家互联网广告公司&#xff08;现隶属于 Google&#xff09;&#xff0c;公司团队开发并利…

数字化转型的密码:传统企业转型路径的深度剖析

引言&#xff1a;数字经济浪潮下的新资产形态 传统行业&#xff0c;如零售、金融、教育等&#xff0c;在如今这场数字化浪潮中&#xff0c;同样需要积极拥抱变革&#xff0c;探索适合自身的转型之路。本文将结合制造业的转型经验&#xff0c;深入探讨传统行…

电脑开机0x0000007B蓝屏怎么办?

电脑开机0x0000007B蓝屏怎么办啊?相信很多用户的电脑都有遇到过蓝屏的问题,最近有用户电脑一开机就蓝屏,并且显示0x0000007B错误代码,原本想通过安全模式进行修复,结果发现安全模式进不去,不知道该怎么解决。这可能与我们的内存或硬盘有关,尝试设置一下硬盘模式,看看是…

嵌入式软件工程师都需要安装哪些软件

文章目录 一、编程软件1.keil2.vscode①Chinese&#xff1a;中文②C/C、C/C Extension Pack③CMake、CMake Tools等代码调试运行的工具④Remote-SSH等&#xff0c;关于远程登录linux服务器的插件 3.Pycharm和Anaconda&#xff0c;用来写python脚本和配置环境&#xff0c;PYQT上…

【Python】搭建 Python 环境

目 录 一.安装 Python二.安装 PyCharm 要想能够进行 Python 开发&#xff0c;就需要搭建好 Python 的环境 需要安装的环境主要是两个部分&#xff1a; 运行环境: Python开发环境: PyCharm 一.安装 Python (1) 找到官方网站 (2) 找到下载页面 选择 “Download for Windows”…

[蓝桥杯 2022 省 A] 求和

[蓝桥杯 2022 省 A] 求和 题目描述 给定 n n n 个整数 a 1 , a 2 , ⋯ , a n a_{1}, a_{2}, \cdots, a_{n} a1​,a2​,⋯,an​, 求它们两两相乘再相加的和&#xff0c;即 S a 1 ⋅ a 2 a 1 ⋅ a 3 ⋯ a 1 ⋅ a n a 2 ⋅ a 3 ⋯ a n − 2 ⋅ a n − 1 a n − 2 ⋅ a…

C语言:动态内存管理(malloc,calloc,realloc,free)

目录 前言 malloc函数 free函数 calloc函数 realloc函数 前言 在这一章节将讲解动态内存分配&#xff0c;它可以在程序的堆区创建一块内存&#xff0c;在这块内存中存什么值就是由自己决定的了 开辟的空间有两个特点&#xff1a; 1. 空间开辟的大小是固定的 2. 数组在…

零基础学习挖掘PHP网站漏洞

教程介绍 本套课程&#xff0c;分为三个阶段&#xff1a;第一阶段&#xff1a;基础篇 学习PHP开发的基础知识&#xff0c;对PHP常见的漏洞进行分析&#xff0c;第二阶段&#xff1a;进阶篇 实战PHP漏洞靶场&#xff0c;了解市面上的PHP主流网站开发技术&#xff0c;并对市面上…

JAVA获取免费天气

JAVA调用天气代码示例 前沿&#xff1a;最近在开发任务中需要获取每日的实时天气和天气预报&#xff0c;要求还是免费的。在网络上搜索了一下免费的API并有了以下思路 免费API网址&#xff1a;https://dev.qweather.com/docs/api/grid-weather/grid-weather-now/ 调用格林天…

工程企业的未来选择:Java版工程项目管理系统平台与数字化管理的融合

在现代化的工程项目管理中&#xff0c;一套功能全面、操作便捷的系统至关重要。本文将介绍一个基于Spring Cloud和Spring Boot技术的Java版工程项目管理系统&#xff0c;结合Vue和ElementUI实现前后端分离。该系统涵盖了项目管理、合同管理、预警管理、竣工管理、质量管理等多个…

基于Colab训练的yolov4-tiny自定义数据集(可用于OpenCV For Unity)

参考资料文档和视频。 1.打开文档,点击【文件】【在云端硬盘中保存一份副本】,即将文档复制到自己云端硬盘。 2.打开该文件,按文中提示进行。 【代码执行程序】【更改运行时类型】修改运行时为GPU(免费的GPU不好用,收费的好用,某宝上几十元就可用一个月) 步骤1) !git…

大数据面试题 —— Kafka

目录 消息队列 / Kafka 的好处消息队列的两种模式什么是 KafkaKafka 优缺点你在哪些场景下会选择 Kafka讲下 Kafka 的整体结构Kafka 工作原理 / 流程Kafka为什么那么快/高效读写的原因 / 实现高吞吐的原理生产者如何提高吞吐量&#xff08;调优&#xff09;kafka 消息数据积压&…

python爬虫-----输入输出与流程控制语句(第四天)

&#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; &#x1f388;&#x1f388;所属专栏&#xff1a;python爬虫学习&#x1f388;&#x1f388; ✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天…

力扣:字母迷宫,python

这里写自定义目录标题 问题描述题解踩坑记录global和nonlocal关键字的区别&#xff1a;类中可以用实例变量替换全局变量 问题描述 字母迷宫游戏初始界面记作 m x n 二维字符串数组 grid&#xff0c;请判断玩家是否能在 grid 中找到目标单词 target。 注意&#xff1a;寻找单词…