【分布式websocket】群聊中的各种难点以及解决推拉结合【第16期】

前言

群聊中未读消息如何设计,以及是推消息还是拉去消息如何选择是需要讨论的。推送消息是推送全量消息还是推送信号消息让客户端再去拉取。其中方案如何选型会比较纠结。
首先基本的推拉结合思路是在线用户推送消息。用户离线的话上线去拉取消息。这是简单的推拉结合。问题在于群聊消息的特点有发送消息比较频繁。假设群里面有一秒钟发送了100条消息。如果推送的话一个人需要推送一百次。但是拉取的话只需要一次就可以拉取所有消息。但是通过 信号消息这样的方案使得设计非常复杂。详情看这篇文章。
目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期(废弃))】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】
IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】
微信发送一条消息经历哪些过程。企业微信以及钉钉的IM架构对比【第13期】
微信群为什么上限是500人,IM设计系统中的群聊的设计难点【第14期】
【分布式websocket】RocketMQ发送消息保证消息最终一致性需要做哪些处理?【第15期】
B站上面关注我呐 B站和CSDN同名,B站1000粉丝后建群。然后B站关注我后可以私信CSDN来的,然后后面我建群的时候拉你!

群聊中消息已读未读如何设计

数据模型设计

在群聊系统中,管理未读消息的两种常见方法是:记录每个用户与每条消息之间的已读/未读状态,以及记录用户的最后一次阅读消息ID。每种方法都有其优缺点,适用于不同的场景。
结论
如果你的系统需要精确跟踪每条消息的阅读状态,或者需要支持复杂的消息阅读状态查询,可以选择记录每个用户与每条消息之间的已读/未读状态。
如果你的系统更注重性能和可扩展性,或者只需要基本的未读消息功能,记录用户的最后一次阅读消息ID是一个更高效的选择。
通常,考虑到性能和实现的复杂度,许多现代的群聊系统倾向于使用记录最后一次阅读消息ID的方法。这种方法能够满足大多数场景的需求,同时保持系统的高效和简洁。
我们采用 记录用户的最后一次阅读消息ID。

CREATE TABLE yan_im_read(
    user_id VARCHAR(255) NOT NULL,
    group_id VARCHAR(255) NOT NULL,
    last_read_message_id BIGINT,
	count int
    
);

具体未读这块会在下一篇离线消息设计中说明
设计思路
功能实现

  1. 发送消息:
    当用户发送消息时,将消息存入消息表,并为每个接收者在用户-会话关系表中的未读消息数加一。
    对于群聊,为群内每个成员(除了发送者)的未读消息数加一。
  2. 阅读消息:
    当用户打开一个聊天窗口时,系统将该会话的未读消息数重置为0,并更新最后阅读时间或最后阅读的消息ID。
    同时,前端展示未读消息数,并将其清零。
  3. 查询未读消息数:
    用户登录或在主界面时,系统查询用户-会话关系表,获取每个会话的未读消息数,以及总的未读消息数。
    这些信息用于在用户的聊天列表中显示每个聊天窗口的未读消息数,以及应用图标上显示的总未读数。

群聊中消息推送模型采用推送还是用户自己拉取

** 模式一 推送模式(Push)**
在推送模式下,当有新消息时,服务器主动将消息推送给客户端。这种模式可以实现实时通信,用户体验较好。
优点:
实时性:用户可以即时接收到消息,无需主动查询,提高了通信的实时性。
减轻客户端负担:客户端无需定时向服务器发送查询请求,减少了网络请求和资源消耗。
缺点:
服务器负担较重:需要服务器跟踪每个客户端的连接状态,并实时推送消息。
可能存在消息丢失:在网络不稳定或客户端离线时,推送的消息可能会丢失。
模式二 拉取模式(Pull)

  • 在拉取模式下,客户端定时向服务器发送请求,查询是否有新消息。如果有,客户端再拉取这些新消息。
  • 优点:
    简单可靠:客户端主动拉取,可以根据需要重试,减少了消息丢失的风险。
    服务器处理简单:服务器不需要跟踪客户端的连接状态,只需响应客户端的请求。
  • 缺点:
    延迟:用户接收到消息的速度取决于拉取的频率,可能无法做到实时通信。
    增加客户端负担:客户端需要定时发送请求,增加了网络请求和资源消耗。

模式三 推送通知后客户端拉取消息
在这种策略中,当群聊中有新消息时,服务器不直接发送消息内容,而是发送一个有新消息的通知给群成员,由客户端在收到通知后主动向服务器拉取新消息。
优点:
减轻了服务器推送的压力,尤其是在高并发场景下。
更灵活,可以根据客户端的实际情况(如网络状况、用户设置等)决定是否拉取新消息。
方便实现对离线消息的处理,客户端上线后可以主动拉取期间的所有新消息。
缺点:
实时性略差,用户收到消息有一定的延迟。
客户端逻辑更复杂,需要实现拉取新消息的逻辑。

结合使用
在实际应用中,为了兼顾实时性和系统资源的有效利用,往往会结合推送和拉取两种模式:

  • 推送+拉取:对于实时性要求高的消息,如即时聊天消息,采用推送模式,确保用户能够及时收到。对于实时性要求不高的信息,如离线消息或通知,可以在用户上线时通过拉取模式获取。
  • 状态同步:使用推送模式进行实时消息通信,同时,客户端在特定情况下(如启动、网络恢复等)主动拉取最新状态,以确保没有遗漏的消息。
  • 根据消息优先级去选择推送或者拉取:还可以根据消息的优先级和类型,选择不同的推送策略。对于重要或紧急的消息,可能采用直接推送的方式;而对于普通消息,则采用通知加拉取的方式
  • 注意事项
    推送策略:在推送模式下,需要合理设计推送策略,比如使用消息队列管理待推送消息,以应对高并发场景。
    拉取策略:在拉取模式下,需要考虑合理的拉取频率,避免过于频繁导致的资源浪费,或过于稀疏导致的实时性不足。
    用户体验:在设计消息推送模型时,应考虑到用户体验,提供稳定、可靠、及时的消息服务。

实时推送的优化策略

1.消息队列优化
利用消息队列来管理消息的发送。当有新消息时,先将消息发送到消息队列中,然后使用消费者服务批量从队列中取出消息进行处理和推送。这种方式可以有效地平衡负载,提高消息处理的效率。
2.分批推送
对于大群聊,一次性向所有成员推送可能会导致服务器压力过大。可以将群成员分批,每批包含一定数量的用户,然后逐批推送。这样既可以减轻服务器压力,又可以避免网络拥塞。
3.WebSocket连接池
对于基于WebSocket或长连接的推送方式,使用连接池来管理和复用连接。这样可以减少频繁建立和断开连接的开销,提高推送效率。
4. 监控和调优
持续监控推送系统的性能指标,如推送延迟、失败率等。根据监控结果调整批处理大小、分批策略和资源分配,以达到最优的推送效率。

MQ批量消费逻辑

先说明一下消费什么。批量消费可以解决什么问题。在之前的消息链路消息有落库的环节。
在这里插入图片描述

落库的话需要监听mq然后消费这条消息进行落库。这里采用的策略是批量落库。避免发送一条消息保存一条消息这样的频繁访问数据库。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
@Service
@RocketMQMessageListener(topic = "yourTopic", consumerGroup = "yourConsumerGroup", consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 10, messageModel = MessageModel.CLUSTERING)
public class MyBatchConsumerService implements RocketMQListener<List<MessageExt>> {

    @Override
    public void onMessage(List<MessageExt> messages) {
        for (MessageExt message : messages) {
            // 处理每条消息
            System.out.println(new String(message.getBody()));
        }
        // 实现批量处理逻辑
    }
}

配置消费者属性
在application.properties或application.yml中配置消费者的属性,特别是consumeMessageBatchMaxSize,这个属性决定了消费者每次批量拉取处理的消息最大数量。

rocketmq:
  consumer:
    consumeMessageBatchMaxSize: 10

如果需要更细致地控制消费者的配置,可以通过编程方式自定义消费者。这通常涉及到创建DefaultMQPushConsumer的实例,并设置相关属性。

@Configuration
public class RocketMQConsumerConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    @Bean
    public DefaultMQPushConsumer batchConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeMessageBatchMaxSize(10); // 设置批量消费的大小
        consumer.subscribe("YourTopic", "*"); // 订阅主题
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // 实现批量消息处理逻辑
            msgs.forEach(msg -> {
                System.out.println(new String(msg.getBody()));
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        return consumer;
    }
}

. 注意事项

  • 消息大小限制:确保批量消费的总消息大小不超过RocketMQ的限制(默认不超过4MB)。
  • 异常处理:在批量处理消息时,确保有适当的异常处理机制。如果批量中的某个消息处理失败,需要决定是重试整个批次还是仅重试失败的消息。
  • 性能测试:在实际部署前,进行充分的性能测试,以确定最优的consumeMessageBatchMaxSize值。过大或过小的批量大小都可能影响消费效率。
    通过上述配置和建议,可以在Spring Boot应用中有效地实现RocketMQ的批量消费,提高消息处理的效率和应用性能。

介绍群聊模式三

当群聊中有新消息时,服务器不直接发送消息内容,而是发送一个有新消息的通知给群成员,由客户端在收到通知后主动向服务器拉取新消息。这种模式实现起来暂时有点复杂。先按照简单的推拉结合处理。但是可以保留着思路,然后后面解决。
方案概述

  1. 信号消息推送:当有新消息发送到群聊时,服务器不会直接将完整的消息内容推送给所有群成员。相反,它只发送一个信号消息,通知客户端有新消息可用。
  2. 客户端拉取消息:收到信号消息后,客户端会主动向服务器发起请求,拉取自上次更新以来的所有新消息。
    方案优点
    减少带宽消耗:由于不是所有的消息内容都通过推送发送,这种方案可以显著减少网络带宽的消耗。
    提高效率:客户端可以根据实际需要批量拉取消息,减少网络请求的次数,提高数据同步的效率。
    保证消息完整性:通过拉取机制,即使在网络不稳定的情况下,客户端也能确保最终获取到所有的消息,避免消息丢失。
    支持离线消息:当用户离线时,信号消息可以被服务器暂存,用户上线后再拉取所有未读消息,保证消息的完整同步。
    方案实现要点

信号消息设计:信号消息应包含足够的信息,以便客户端知道从哪里开始拉取新消息。例如,可以包含最新消息的ID或时间戳。

消息存储:服务器需要有效地存储和管理消息,以支持高效的消息拉取操作。通常需要对消息进行索引,以便快速查询到新消息。
拉取策略:客户端可以实现智能的拉取策略,例如,在用户查看群聊时主动拉取新消息,或者在收到多个信号消息时合并请求,减少服务器的负载。
错误处理和重试机制:为了保证消息的完整性,客户端在拉取消息时应该实现错误处理和重试机制,确保在网络不稳定时也能成功获取消息。
适用场景
这种结合推送和拉取的方案非常适合消息量大、用户基数广的群聊系统,尤其是在需要优化网络资源消耗、保证消息可靠传递的场景下。同时,这种方案也适用于需要支持离线消息同步的应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/457356.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WPF —— TabControl、StackPanel 控件详解

1 TabControl简介 表示包含多个项的控件&#xff0c;这些项共享屏幕上的同一空间。 TabControl有助于最大程度地减少屏幕空间使用量&#xff0c;同时允许应用程序公开大量数据。 TabControl包含共享同一屏幕空间的多个 TabItem 对象。一次只能看到 TabControl 中的一个 Ta…

交换机/路由器的存储介质-华三

交换机/路由器的存储介质-华三 本文主要介绍网络设备的存储介质组成。 ROM(read-only memory&#xff0c;只读存储器) 用于存储 BootROM程序。BootROM程序是一个微缩的引导程序&#xff0c;主要任务是查找应用程序文件并引导到操作系统&#xff0c;在应用程序文件或配置文件出…

AJAX 01 AJAX 概念和 axios 使用

2.27 AJAX 学习 AJAX 1 入门01 AJAX 概念和 axios 使用axios 使用案例 02 认识 URLURL组成 03 URL 查询参数axios&#xff0d;查询参数案例 &#xff1a;地区查询 04 常用请求方法和数据提交axios 请求配置axios 错误处理 05 HTTP协议-报文① 请求报文作用&#xff1a;错误排查…

uniapp微信小程序_自定义交费逻辑编写

一、首先看最终效果 先说下整体逻辑,选中状态为淡紫色,点击哪个金额,充值页面上就显示多少金额 二、代码 <view class"addMoney"><view class"addMoneyTittle">充值金额</view><view class"selfaddmoney" :class"{…

力扣日记3.14-【贪心算法篇】376. 摆动序列

力扣日记&#xff1a;【贪心算法篇】376. 摆动序列 日期&#xff1a;2024.3.14 参考&#xff1a;代码随想录、力扣 376. 摆动序列 题目描述 难度&#xff1a;中等 如果连续数字之间的差严格地在正数和负数之间交替&#xff0c;则数字序列称为 摆动序列 。第一个差&#xff08;…

【总结】服务器无法连接外网,设置http代理解决

问题 某天想要在服务器上下载编译github上某开源项目&#xff0c;结果发现访问不了外网。 于是找运维&#xff0c;运维给了个http代理服务器地址。简单操作后&#xff0c;就可以访问外网了。 解决 在需要访问外网的机器上&#xff0c;执行以下命令&#xff1a;http_proxyhtt…

rust学习(简单链表)

编写一个简单链表&#xff0c;主要遇到的问题就是next指针&#xff08;按照C的写法&#xff09;的数据如何定义。按照网上的建议&#xff0c;一般定义如下&#xff1a; struct Node {pub value:u32,pub next:Option<Rc<RefCell<Node>>>, //1 }1.用Option主要…

GoLang:云原生时代致力于构建高性能服务器的后端语言

Go语言的介绍 概念 Golang&#xff08;也被称为Go&#xff09;是一种编程语言&#xff0c;由Google于2007年开始设计和开发&#xff0c;并于2009年首次公开发布。Golang是一种静态类型、编译型的语言&#xff0c;旨在提供高效和可靠的软件开发体验。它具有简洁的语法、高效的编…

外卖小程序-购物车模块表结构设计和后端代码

表结构设计 添加购物车代码 Service public class ShoppingCartServiceImpl implements ShoppingCartService {Autowiredprivate ShoppingCartMapper shoppingCartMapper;Autowiredprivate DishMapper dishMapper;Autowiredprivate SetmealMapper setmealMapper;/*** 添加购物…

在浏览器的控制台定义变量,清除后还是报错变量已声明

报错&#xff1a;Uncaught SyntaxError: Identifier words has already been declared 在浏览器的控制台&#xff08;Console&#xff09;中定义的变量是全局变量&#xff0c;它们会保留在当前的浏览器窗口或标签页的生命周期中。即使你清除了控制台的内容&#xff08;例如通过…

[云原生] Prometheus自动服务发现部署

一、部署服务发现 1.1 基于文件的服务发现 基于文件的服务发现是仅仅略优于静态配置的服务发现方式&#xff0c;它不依赖于任何平台或第三方服务&#xff0c;因而也是最为简单和通用的实现方式。 Prometheus Server 会定期从文件中加载 Target 信息&#xff0c;文件可使用 YAM…

基于Java的海南旅游景点推荐系统(Vue.js+SpringBoot)

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户端2.2 管理员端 三、系统展示四、核心代码4.1 随机景点推荐4.2 景点评价4.3 协同推荐算法4.4 网站登录4.5 查询景点美食 五、免责说明 一、摘要 1.1 项目介绍 基于VueSpringBootMySQL的海南旅游推荐系统&#xff…

使用stream流合并多个List(根据实体类特定属性合并)

开发情景 现有多个List集合,其中都是一样的实体类,这里我想根据实体类的特定属性将它们合并在一起,形成一个最终的List集合。 这里主要用到了Stream流的flatMap方法与reduce方法。 flatMap:可以将多个Stream流合并在一起,形成一个Stream流。 reduce:可以将Stram流中的元…

Oracle登录错误ERROR: ORA-01031: insufficient privileges解决办法

这个问题困扰了我三个星期&#xff0c;我在网上找的解决办法&#xff1a; 1.控制面板->管理工具->计算机管理->系统工具->本地用户和组->ORA_DBA组。 但我电脑上根本找不到。 2.在oracle安装目录下找到oradba.exe运行。 最开始我都不到这个oradba.exe文件在哪…

Java基于微信小程序的校园订餐小程序的研究与实现,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Linux本地部署开源AI的PDF工具—Stirling PDF并实现公网随时访问

文章目录 1. 安装Docker2. 本地安装部署StirlingPDF3. Stirling-PDF功能介绍4. 安装cpolar内网穿透5. 固定Stirling-PDF公网地址 本篇文章我们将在Linux上使用Docker在本地部署一个开源的PDF工具——Stirling PDF&#xff0c;并且结合cpolar的内网穿透实现公网随时随地访问。 S…

PBKDF2算法:保障密码安全的利器

title: PBKDF2算法&#xff1a;保障密码安全的利器 date: 2024/3/14 16:40:05 updated: 2024/3/14 16:40:05 tags: PBKDF2算法密码安全性迭代盐值密钥 PBKDF2算法起源&#xff1a; PBKDF2&#xff08;Password-Based Key Derivation Function 2&#xff09;算法是一种基于密码…

Pycharm / idea上传项目到 Github 报错

报错内容: gitgithub.com: Permission denied (publickey).翻译–>gitgithub.com:权限被拒绝(公钥). 出现上述报错的原因:   客户端与服务端的ssh key不匹配   客户端与服务端未生成 ssh key 登录上Github查看ssh key是否存在&#xff0c;如果存在&#xff0c;那么可以对…

机器学习-0X-神经网络

总结 本系列是机器学习课程的系列课程&#xff0c;主要介绍机器学习中神经网络算法。 本门课程的目标 完成一个特定行业的算法应用全过程&#xff1a; 懂业务会选择合适的算法数据处理算法训练算法调优算法融合 算法评估持续调优工程化接口实现 参考 机器学习定义 关于机…

手写简易操作系统(八)--特权级以及TSS

前情提要 我们在这里梳理一下上面几节讲的内容 首先是计算机开机&#xff0c;BIOS接过第一棒&#xff0c;将第一个扇区MBR的内容导入到内存 0x7c00 的位置。 然后就是MBR中我们自己写的内容&#xff0c;将Loader导入到 0x600 的地址&#xff0c;Loader设置了GDT&#xff0c;…