消息存储与同步策略设计

消息存储与同步策略

https://github.com/robinfoxnan/BirdTalkServer

思路:

  • 私聊写扩散,以用户为中心,存储2次;
  • 群聊读扩散,以群组为中心,存储一次;
  • scylladb易于扩展,适合并发,但是并不适合搜索;如果需要针对聊天记录在服务端搜索的功能,可能还需要加上ES,以会话为中心存储一份;

存储的三级结构如下:

在这里插入图片描述

私聊

优点:以用户为中心比以会话为中心(tinode)的好处就是消息管理更加容易;每个用户的数据相对集中,可以快速的找到并一次性同步给客户;

缺点:数据需要存储2份;

群聊

优点:群聊使用读扩散,存储数据量少;

缺点:读扩散,如果用户反复离线与上线,需要读取离线数据,对scylladb压力比较大;

所有类型的IM系统都有一个共同的难点:如何同步数据,不丢消息?

同步机制

私聊和群聊在正常情况下如果所有用户在线,服务器也不重启,那么很容易保证实时转发不丢消息。

之所以会同步起来比较复杂就在于:

1)用户离线不定长时间后,上线时需要同步消息,而消息可能会非常的多(大群);

2)支持多终端登录,某个终端长久未使用,上线后也需要同步消息;

其实WX在多终端登录同步数据这一点上做的挺差的;多终端在线,某个终端时而离线,就会无法同步到所有的数据;而其他的一些系统就好多了,后加入群聊的也能看到之前的对话;多终端同步也好多了。

用户离线后重新登录,需要与服务器同步消息,需要保证尽量不丢包;这里需要有一个合适的同步机制。

基本策略:

登录后,根据本地保存的消息记录,对比时间差;如果离线时间不久,优先使用正向加载(私聊手机端);如果离线时间久,或者是群聊,优先使用倒序加载;(私聊的电脑端长期未登录也需要倒序加载)

同步流程:

用户登录就绪后,分为3类情况:

1)私聊:客户端比对最后接收消息时间,如果小于1天,则可以尝试正向加载消息,向服务器提供该条msgId,直到同步到消息列表末尾(一般情况下一天的私聊数据也不会超过1000条);如果时间较久,应该向服务申请反向加载数据到msgId为止;如果是老用户的新终端,也应该反向加载数据,并在用户界面提示用户按需加载;

2)群聊:新用户加入群聊后,以及离线后再次登录,都需要倒序加载数据;(这是因为群聊数据量可能非常庞大,而且用户也不需要从最开始的消息开始阅读,可以根据需要适当加载)

3)服务器假死:集群情况下,服务器由于负载大,没有即时上报心跳状态,造成其他服务器没有即时发送转发的消息;服务器恢复后发现此状态,应该按比例断开部分客户端链接;未断开的用户也应该要求客户端重新同步离线数据;

详见第2节部分。

1. ScyllaDb存储

这里使用了一个snow雪花算法生成唯一的消息ID,使用高42比特来保存毫秒时间戳,12比特作为流水号,所以每个毫秒最多支持4096个流水号;

那么这个ID就可以代表时间了,所以我们可以用它来排序,或者得到时间;

1.1 传输结构

// 聊天存储的基本信息
message MsgChat {
  int64 msgId = 1;                // 消息的全网唯一标识,服务端使用雪花算法生成,因为客户端生成的不可靠
  int64 userId = 2;               // 用于存储的clusterKey,因为一份消息要存储2次,要转发,需要有这个字段

  int64 fromId = 3;              // 发送消息的用户 ID
  int64 toId = 4;                // 接收消息的用户 ID(对方的用户 ID)

  int64 tm = 5;                   // 消息的时间戳

  string devId = 6;               // 多设备登录时的设备 ID
  string sendId = 7;              // 用于确认消息的发送 ID

  ChatMsgType msgType = 8;        // 消息类型,建议使用枚举
  bytes data = 9;                 // 消息的内容,可以使用 bytes 存储二进制数据或文本数据

  MsgPriority priority = 10;      // 消息的优先级,建议使用枚举
  int64 refMessageId = 11;        // 引用的消息 ID,如果有的话

  ChatMsgStatus status = 12;      // 消息状态,建议使用枚举
  int64 sendReply = 13;           // 发送消息的回执状态
  int64 recvReply = 14;           // 接收消息的回执状态
  int64 readReply = 15;           // 已读状态的回执

  EncryptType encType = 16;       // 加密类型
  ChatType chatType = 17;         // p2p, group, system
  int32 subMsgType = 18;          // 传递给插件区分代码,插件都注册为整数类型,
  int64 keyPrint = 19;            // 秘钥指纹
}

在传输过程中,私聊和群聊的消息是共用的;

服务为了保存到数据库需要进行格式转化:

1.2 私聊

私聊是写扩散,所以需要在表中对每个人都写一次,区别在于uid1和uid2交换一次,pk肯定也是需要交换的

type PChatDataStore struct {
	Pk   int16 `db:"pk"`
	Uid1 int64 `db:"uid1"`
	Uid2 int64 `db:"uid2"`
	Id   int64 `db:"id"`
	Usid int64 `db:"usid"`
	Tm   int64 `db:"tm"`
	Tm1  int64 `db:"tm1"`
	Tm2  int64 `db:"tm2"`

	Io    int8   `db:"io"`  // 0=out, 1=in
	St    int8   `db:"st"`  // 0=normal, 1=送达,2阅读,
	Ct    int8   `db:"ct"`  // 0=p2p_plain, 1=system, 2=p2_encrypted,
	Mt    int8   `db:"mt"`  // 0=text, 1=pic, 2=
	Print int64  `db:"pr"`  // 秘钥哈希的低8字节作为指纹
	Ref   int64  `db:"ref"` // 引用
	Draf  []byte `db:"draf"`
}

对应的建表语句:

const cqlCreateTablePChat = `CREATE TABLE IF NOT EXISTS  chatdata.pchat (
			pk smallint,
			uid1 bigint, 
			uid2 bigint,
			id bigint,
			usid bigint,
			tm bigint,
			tm1 bigint,
			tm2 bigint,
			io tinyint,
			st tinyint,
			ct tinyint,
			mt tinyint,
			draf blob,
			pr  varint,
			ref varint,
			PRIMARY KEY (pk, uid1, id)
		)`

这里提供了如下几个函数:

// 写2次,首先是发方A,然后是收方B
func (me *Scylla) SavePChatData(msg *model.PChatDataStore, pk2 int) error

// 对发送方设置回执,收方不需要设置,这里提供了收方的参数,是为了兼容,以后也许也保存
func (me *Scylla) SetPChatRecvReply(pk1, pk2, uid1, uid2, msgId, tm1 int64) error
func (me *Scylla) SetPChatReadReply(pk1, pk2, uid1, uid2, msgId, tm2 int64)
func (me *Scylla) SetPChatRecvReadReply(pk1, pk2, uid1, uid2, msgId, tm1, tm2 int64) error

// 设置删除,不可逆
func (me *Scylla) SetPChatMsgDeleted(pk1, pk2, uid1, uid2, msgId int64) error

// 正向查找,如果从头开始查找,那么设置为littleId = 0
func (me *Scylla) FindPChatMsg(pk, uid, littleId int64, pageSize uint) ([]model.PChatDataStore, error) 

// 正序查找,设置边界范围
func (me *Scylla) FindPChatMsgForwardBetween(pk, uid, littleId, bigId int64, pageSize uint) ([]model.PChatDataStore, error)

// 从最新的数据向前倒序查若干条
func (me *Scylla) FindPChatMsgBackward(pk, uid, pageSize uint) ([]model.PChatDataStore, error)

// 从某一点开始向之前的历史数据反向查找,即 所有小于bigId 的
func (me *Scylla) FindPChatMsgBackwardFrom(pk, uid, bigId int64, pageSize uint) ([]model.PChatDataStore, error)

// 从当前最新开始向之前的历史数据反向查找,即 所有大于littlId 的
func (me *Scylla) FindPChatMsgBackwardTo(pk, uid, littleId int64, pageSize uint) ([]model.PChatDataStore, error)

// 向之前的历史数据反向查找
func (me *Scylla) FindPChatMsgBackwardBetween(pk, uid, littleId, bigId int64, pageSize uint) ([]model.PChatDataStore, error)

1.3 群聊

type GChatDataStore struct {
	Pk   int16 `db:"pk"`
	Gid  int64 `db:"gid"`
	Uid  int64 `db:"uid"`
	Id   int64 `db:"id"`
	Usid int64 `db:"usid"`
	Tm   int64 `db:"tm"`
	Res  int8  `db:"res"` // 保留
	St   int8  `db:"st"`  // 0=normal, 1=送达,2阅读,
	Ct   int8  `db:"ct"`  // 0=普通,1=广播
	Mt   int8  `db:"mt"`  // 0=text, 1=pic, 2=

	Print int64  `db:"pr"`  // 秘钥哈希的低8字节作为指纹
	Ref   int64  `db:"ref"` // 引用
	Draf  []byte `db:"draf"`
}

去掉了uid2和tm2, tm3 群聊的消息不保存回执,多次读,每个用户都自己去读;

const cqlCreateTableGChat = `CREATE TABLE IF NOT EXISTS  chatdata.gchat (
			pk smallint,
			gid bigint,
			uid bigint, 
			id bigint,
			usid bigint,
			tm bigint,
			res tinyint,
			st tinyint,
			ct tinyint,
			mt tinyint,
			draf blob,
			pr  varint,
			ref varint,
			PRIMARY KEY (pk, gid, id)
		)`

相关函数如下:

// 保存
func (me *Scylla) SaveGChatData(msg *model.GChatDataStore) error


// 设置删除,不可逆
func (me *Scylla) SetGChatMsgDeleted(pk, gid, msgId int64) error

// 倒序,反向历史数据方向查找,从最新的数据开始向前加载
func (me *Scylla) FindGChatMsgBackwardTo(pk, gid, littleId int64, pageSize uint) ([]model.GChatDataStore, error)

// 倒序,从bigId 向littleId方向去查找,限定一定的个数,如果无法覆盖边界,再来一次
func (me *Scylla) FindGChatMsgBackwardBetween(pk, gid, littleId, bigId int64, pageSize uint) ([]model.GChatDataStore, error)

消息的所有者,以及管理员可以设置删除消息,这里的删除等同于微信的撤回,而不是本地删除;

2. Redis缓存

2.1 群聊消息缓存

每个群组有一个list用于存储,左侧插入,默认1000条缓存,如果超过就会删除;

键名字类似:bsgmsg_1001

func (cli *RedisClient) GetGroupLatestMsg(gid, count int64) ([]string, error)
func (cli *RedisClient) GetGroupLatestMsgPage(gid, offset, count int64) ([]string, error)
func (cli *RedisClient) GetGroupLatestMsgCount(gid, count int64) (int64, error) 
func (cli *RedisClient) PushGroupMsg(gid int64, msg string)

群聊用户离线后,重新上线后,先发所收到的最后一条消息的msgId,如果每个用户上线都搜索数据库,那么会非常耗费数据库资源,所以先从redis将最近的100条数据返回给用户;

这样就有了一个新的问题,用户如何知道中间缺失了部分消息?那么需要有一个节省流量与资源的同步方式:

**原则:**用户每次登录后主动请求加载离线数据,收到数据后回执,如果不请求数据,则不保证数据的完整性,在线时仅仅推送

1)用户登录准备好收发消息后,服务端首先设置状态;

2)用户需要同步群消息时,先发一个群消息同步请求,里面携带收到的最后的群消息msgId;

2)服务器加载最近的所有的消息(redis群缓存里的), 推送之后,需要推送一条待加载数据,通知前边还有数据需要同步;

用户端的群消息存储sqlite如下:

序号msg_id状态
100001
100003
100005
1000017待加载
1000018
1000020
1000025
1000075
1000086

比如,此次登录后,服务器推送了[1000086, 1000075, 1000025, 1000020, 1000018]数据后,尾号17的条目就是服务器发送的通知,这个编号完全可以从前一个msd_id = 1000018 减一得到,意思是从这里向前加载;

客户端需要插入这样的一条数据,下次从本地加载时,发现有这样一条数据,证明需要从这个位置向前加载,

界面上显示 ”待加载“的提示按钮,用户可以选择继续向前查看,客户端发送新的查询请求,

收到新加载的数据后,如果msg_id的范围越过了这条标记,那么这条标记就可以删除了。

这里存在一种异步竞争的情况,可能丢失消息:

登录后同步协程发消息协程
1)检测到用户离线,不推送最新消息m
1) 用户结构建立后,标记在线
2)加载离线数据,推送离线数据
2) redis中插入m

需要将流程改变一下:使用锁或者原子操作atomic来设置和读取用户的状态

登录后同步协程发消息协程
1)保存数据库,并在redis插入最新的消息m
2.1) 发现A不在线,未推送m
1) 用户结构建立后,标记在线
2)加载离线数据,推送离线数据2.2) 发现A在线,直接推送m

这里就会有2种可能性,

2.2) 转发消息的协程发现用户在线,直接转发消息,此时会造成重复推送;

2.1) 转发协程虽然没有转发给用户,但是同步协程会加载离线数据;

这里队列中加载所有数据都需要收到用户确认回执后再删除;

然而这里还有一个问题,存入redis队列中的消息,是使用protobuf定义的结构序列化,或者使用model.GChatDataStore结构序列化为JSON保存好;从效率上说,应该是protobuf的版本更好;

2.2 私聊消息缓存

私聊消息在redis中不设置缓存,在每个用户的内存结构中使用循环队列保存,如果离线,则内存也不保存离线消息,只在离线的数据库中保存。

单机模式下,用户A的数据的加载可能是因为对方给A发送数据,所以即便缓存数据,(因为服务器可能重启过)也未必是所有的离线数据;

集群模式下,用户A和聊天的对象不一定在同一台服务器上,即便某台服务器内存缓存了A的离线数据,下次登录页未必一定在这台服务器登录,所以内存缓存没有意义;

而redis缓存中的user信息的hash表中可以保存一个用户最后收到的消息的msgId,那么从这个ID开始搜索就加载所有未同步的离线数据了;

那么,当每次用户提交接收回执的时候,需要记录最后一条回执的ID,为了减少redis的开销,可以每30秒执行一次redis同步;

但是,其实也不需要保存最后的ID,还是让用户根据msgId反向加载即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/510644.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

蚁剑流量分析

蚁剑流量分析 在靶机上面上传一个一句话木马&#xff0c;并使用蚁剑连接&#xff0c;进行抓包, 一句话木马内容 <?php eval($_POST[1]); defalut编码器 在使用蚁剑连接的时候使用default编码器 连接之后进行的操作行为是查看当前目录(/var/www/html)下的文件&#xff0…

网易云首页单页面html+css

网页设计与网站建设作业htmlcss 预览 源码查看https://hpc.baicaitang.cn/2083.html

书生 浦语 大模型趣味 Demo

目录 一. 部署 InternLM2-Chat-1.8B 模型进行智能对话 1. 环境准备 2. 下载模型参数 3. 运行Demo 二. 部署实战营 八戒-Chat-1.8B 模型 1. 下载Demo仓库 2. 启动web服务端加载八戒模型&#xff1a; 3. 将SSH远程端口映射到本地 4. 在本地浏览器打开&#xff1a;http:/…

【C++第二阶段】案例-职工管理系统

以下内容仅为当前认识&#xff0c;可能有不足之处&#xff0c;欢迎讨论&#xff01; 文章目录 案例>职工管理系统0.退出功能1.增加职工功能2.显示职工信息3.删除职工信息4.修改职工信息5.查找职工信息6.排序职工7.清空所有文档 案例>职工管理系统 首先写一个workmanager…

Adobe ColdFusion 任意文件读取漏洞复现(CVE-2024-20767)

0x01 产品简介 Adobe ColdFusion是美国奥多比(Adobe)公司的一套快速应用程序开发平台。该平台包括集成开发环境和脚本语言,将可扩展、改变游戏规则且可靠的产品的愿景变为现实。 0x02 漏洞概述 由于 Adobe ColdFusion 的访问控制不当,未经身份认证的远程攻击者可以构造恶…

夜晚兼职好选择:六大副业助你增收

晚上兼职&#xff0c;无疑是许多寻求额外收入人群的理想选择。以下为您精心推荐的六个副业&#xff0c;既适合晚间操作&#xff0c;又能让您在轻松愉悦中赚取额外收益。 网络调查与市场研究&#xff1a;利用晚上的闲暇时光&#xff0c;参与网络调查与市场研究&#xff0c;为企业…

《QT实用小工具·七》CPU内存显示控件

1、概述 源码放在文章末尾 CPU内存显示控件 项目包含的功能如下&#xff1a; 实时显示当前CPU占用率。实时显示内存使用情况。包括共多少内存、已使用多少内存。全平台通用&#xff0c;包括windows、linux、ARM。发出信号通知占用率和内存使用情况等&#xff0c;以便自行显示…

思腾合力与中科创达联合推出的迅思代码生成一体机产品

思腾合力与中科创达联合推出的迅思代码生成一体机产品&#xff0c;基于思腾合力强大算力底座&#xff0c;搭载中科创达自研国产大模型&#xff0c;面向众多有编程开发需求的客户&#xff0c;简化编程和软件开发过程 &#xff0c;降低编程门槛&#xff0c;全方位提升开发和生产效…

群晖NAS使用Docker部署大语言模型Llama 2结合内网穿透实现公网访问本地GPT聊天服务

文章目录 1. 拉取相关的Docker镜像2. 运行Ollama 镜像3. 运行Chatbot Ollama镜像4. 本地访问5. 群晖安装Cpolar6. 配置公网地址7. 公网访问8. 固定公网地址 随着ChatGPT 和open Sora 的热度剧增,大语言模型时代,开启了AI新篇章,大语言模型的应用非常广泛&#xff0c;包括聊天机…

ssm018简易版营业厅宽带系统+jsp

营业厅宽带系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本营业厅宽带系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间…

【饿了么笔试题汇总】-2024-04-02-饿了么春招笔试题-三语言题解(CPP/Python/Java)

&#x1f36d; 大家好这里是KK爱Coding &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新饿了么近期的春秋招笔试题汇总&#xff5e; &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x…

整型之韵,数之舞:大小端与浮点数的内存之旅

✨✨欢迎&#x1f44d;&#x1f44d;点赞☕️☕️收藏✍✍评论 个人主页&#xff1a;秋邱’博客 所属栏目&#xff1a;人工智能 &#xff08;感谢您的光临&#xff0c;您的光临蓬荜生辉&#xff09; 1.0 整形提升 我们先来看看代码。 int main() {char a 3;char b 127;char …

枚举---算法

1、定义 枚举算法&#xff1a;也称之为穷举算法&#xff0c;这种算法就是在解决问题的时候去使用所有的方式去解决这个问题&#xff0c;会通过推理去考虑事件发生的每一种可能&#xff0c;最后推导出结果。优点&#xff1a;简单粗暴&#xff0c;它暴力的枚举所有可能&#xff…

算法学习——LeetCode力扣图论篇1(797. 所有可能的路径、200. 岛屿数量、695. 岛屿的最大面积)

算法学习——LeetCode力扣图论篇1 797. 所有可能的路径 797. 所有可能的路径 - 力扣&#xff08;LeetCode&#xff09; 描述 给你一个有 n 个节点的 有向无环图&#xff08;DAG&#xff09;&#xff0c;请你找出所有从节点 0 到节点 n-1 的路径并输出&#xff08;不要求按特…

STM32 DWT数据观察触发器作为延时函数的使用

STM32 DWT数据观察触发器作为延时函数的使用 &#x1f4d1;DWT(Data Watchpoint and Trace数据观察触发器&#xff09;描述 &#x1f4dd;DWT是属于处理器内核单元中的调试组件之一&#xff0c;由四个比较器组成。它们可配置为&#xff1a;硬件监视点或对ETM或PC采样器或数据地…

Ubuntu20.04安装MatlabR2018a

一、安装包 安装包下载链接 提取码&#xff1a;kve2 网上相关教程很多&#xff0c;此处仅作为安装软件记录&#xff0c;方便后续软件重装&#xff0c;大家按需取用。 二、安装 1. 相关文件一览 下载并解压文件后&#xff0c;如下图所示&#xff1a; 2. 挂载镜像并安装 2…

06 | Swoole 源码分析之 Coroutine 协程模块

首发原文链接&#xff1a;Swoole 源码分析之 Coroutine 协程模块 大家好&#xff0c;我是码农先森。 引言 协程又称轻量级线程&#xff0c;但与线程不同的是&#xff1b;协程是用户级线程&#xff0c;不需要操作系统参与。由用户显式控制&#xff0c;可以在需要的时候挂起、或…

回顾快速排序

快速排序 快速排序的核心&#xff1a; 找到一个key 通常左边的数比key小&#xff0c;右边的数比key大。 找key通常有三种方法&#xff1a; 1. 挖坑法&#xff1a; 代码实现&#xff1a; // int _pivot(int* a, int left, int right) {int begin left, end right;int in…

动态图学习新突破!最新SOTA实现性能全面升级,效率与精度兼得

现实世界中的许多图数据是动态变化的&#xff0c;比如社交网络、交通流量等。而传统的图学习方法通常处理的是静态图&#xff0c;这就导致它缺乏处理动态变化的能力&#xff0c;在适应性方面存在局限性。 相较之下&#xff0c;动态图学习能够捕捉到图数据的动态变化&#xff0…

MuJoCo 入门教程(一)

系列文章目录 前言 一、简介 MuJoCo 是多关节接触动力学&#xff08;Multi-Joint dynamics with Contact&#xff09;的缩写。它是一个通用物理引擎&#xff0c;旨在促进机器人、生物力学、图形和动画、机器学习以及其他需要快速、准确地仿真铰接结构与环境交互的领域的研究和开…