[集群聊天服务器]----(十一) 使用Redis实现发布订阅功能

接着上文,[集群聊天服务器]----(十)Nginx的tcp负载均衡配置–附带截图,我们配置nginx,使用了多台服务端来提高单机的并发量,接下来我们回到项目中,思考一下,各个服务端之间怎么进行通信呢?

配置Nginx以后,怎么保证跨服务器通信呢?

使用集群服务器,有多个服务器维护自己的用户列表。ChatServer1与ChatServer2的用户聊天,ChatServer1在自己服务器的用户表中找不到,但是可能用户在线,所以我们需要保证跨服务器间的通信!
但是如果让后端的服务器之间互相连接,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。ChatServer维护了一个连接的用户表,每次向别的用户发消息都会从用户表中查看对端用户是否在线。然后再判断是直接发送,还是转为离线消息。这样的设计使得各个服务器之间耦合度太高 ,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,并且存在一个服务器瘫痪其余都崩溃的情况,不采用
在这里插入图片描述
所以引入中间件消息队列,解耦各个服务器, 使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。但是本项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。有关于redis的安装,在我的另一篇博客中有详细的介绍,Linux下安装redis并配置开机自启保姆级教程-----附带每一步截图
在这里插入图片描述

Redis发布-订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的通道。当有新消息通过 publish 命令发送给通道 时, 这个消息就会被发送给订阅它的客户端。
需要注意的是:这里的subscribe是以阻塞的形式等待publish端发送消息的,publish是一有消息就发送的。

实现

重要成员变量

// hiredis同步上下文对象,负责publish消息
redisContext *_publish_context;

// hiredis同步上下文对象,负责subscribe消息
redisContext *_subcribe_context;

// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
  • redisContext为redis提供的类

重要成员函数

Redis();
~Redis();

// 连接redis服务器 
bool connect();

// 向redis指定的通道channel发布消息
bool publish(int channel, string message);

// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);

// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);

// 在独立线程中接收订阅通道中的消息
void observer_channel_message();

// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);

构造与析构函数

Redis::Redis()
    : _publish_context(nullptr), _subcribe_context(nullptr)
{
}

Redis::~Redis()
{
    //释放资源
    if (_publish_context != nullptr)
    {
        redisFree(_publish_context);
    }

    if (_subcribe_context != nullptr)
    {
        redisFree(_subcribe_context);
    }
}
  • 构造与析构函数重要完成对两个对象的初始化以及释放资源

连接函数

bool Redis::connect()
{
    _publish_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == _publish_context)
    {
        cerr << "connect redis failed!" << endl;
        return false;
    }

    // 负责subscribe订阅消息的上下文连接
    _subcribe_context = redisConnect("127.0.0.1", 6379);
    if (nullptr == _subcribe_context)
    {
        cerr << "connect redis failed!" << endl;
        return false;
    }

    // 在单独的线程中,监听通道上的事件,有消息给业务层进行上报
    thread t([&]()
             { observer_channel_message(); });
    t.detach();

    cout << "connect redis-server success!" << endl;

    return true;
}
  • _publish_context 负责publish发布消息的上下文连接 6379 是 redis-server 监听的端口号
  • _subcribe_context负责subscribe订阅消息的上下文连接
  • 在单独的线程中,监听通道上的事件,有消息给业务层进行上报

发布消息

bool Redis::publish(int channel, string message)
{
    redisReply *reply = (redisReply *)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
    if (nullptr == reply)
    {
        cerr << "publish command failed!" << endl;
        return false;
    }
    freeReplyObject(reply); //释放
    return true;
}
  • 主要完成向redis指定的通道channel发布消息
  • 值得注意的是: redisCommand相当于在redis中敲了一个命令 通道号和消息,先把要发送的命令 缓存到本地 调用了redisAppendCommand,然后调用了redisBufferWrite 把命令发送到redis-server上,最后调用redisGetReply 阻塞等待redis server响应消息,publish一执行马上就回复了,所以可以使用redisCommand
  • 注意释放资源

订阅消息

bool Redis::subscribe(int channel)
{
    if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "SUBSCRIBE %d", channel))
    {
        cerr << "subscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done)
    {
        if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done))
        {
            cerr << "subscribe command failed!" << endl;
            return false;
        }
    }
    // redisGetReply

    return true;
}
  • 主要完成向redis指定的通道subscribe订阅消息
  • 值得注意的是: 订阅消息不会向发布消息一样使用redisCommand命令。因为subscribe命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息,通道消息的接收专门在observer_channel_message函数中的独立线程中进行,只负责发送命令,不阻塞接收redis server响应消息,否则和notifyMsg线程抢占响应资源

取消订阅

bool Redis::unsubscribe(int channel)
{
    if (REDIS_ERR == redisAppendCommand(this->_subcribe_context, "UNSUBSCRIBE %d", channel))
    {
        cerr << "unsubscribe command failed!" << endl;
        return false;
    }
    // redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
    int done = 0;
    while (!done)
    {
        if (REDIS_ERR == redisBufferWrite(this->_subcribe_context, &done))
        {
            cerr << "unsubscribe command failed!" << endl;
            return false;
        }
    }
    return true;
}
  • 主要完成向redis指定的通道unsubscribe取消订阅消息

在独立线程中接收订阅通道中的消息

void Redis::observer_channel_message()
{
    redisReply *reply = nullptr;
    while (REDIS_OK == redisGetReply(this->_subcribe_context, (void **)&reply))
    {
        if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
        {
            // 给业务层上报通道上发生的消息
            _notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
        }

        freeReplyObject(reply);
    }

    cerr << ">>>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<<" << endl;
}
  • 订阅收到的消息是一个带三元素的数组 element[2] 就是消息 element[1] 通道号

初始化向业务层上报通道消息的回调对象

void Redis::init_notify_handler(function<void(int, string)> fn)
{
    this->_notify_message_handler = fn;
}

怎么在项目中使用呢?

在前面的剖析中,我们多多少少也看到了redis的身影,主要是在业务模块使用了它,下面我们在具体看一下在那些部分使用到了redis。

  • 在ChatService类中,首先我们创建了一个redis的操作对象
Redis _redis;
  • 在用户登录成功以后,我们向redis订阅了通道,这里使用id作为通道号
_redis.subscribe(id);
  • 然后创建了一个函数从redis消息队列中获取订阅的消息
void handleRedisSubscribeMessage(int, string);
  • 利用这个操作对象,我们连接了服务器,并设置了上报消息的回调
if(_redis.connect())
{
    _redis.init_notify_handler(std::bind(&ChatService::handleRedisSubscribeMessage, this, _1,_2));
}
  • 在查询到用户在线,但是不在同一个服务端的时候,我们就会调用redis的回调函数
if (user.getState() == "online")
    {
        _redis.publish(toid, js.dump());
        return;
    }
  • 具体实现如下:
void ChatService::handleRedisSubscribeMessage(int userid, string msg)
{
    lock_guard<mutex> lock(_connMutex);
    auto it = _userConnMap.find(userid);
    if (it != _userConnMap.end())
    {
        it->second->send(msg);
        return;
    }

    // 存储该用户的离线消息
    _offlineMsgModel.insert(userid, msg);
}
  • 根据userid寻找用户是否存在,存在就发送消息,不存在就存储他的离线消息
  • 用户注销,在redis中取消订阅通道
 _redis.unsubscribe(userid);

具体的使用就这么多,实现起来还是很简单的,完全足够本项目的开发。

项目测试

剖析到这里,整个项目就完结撒花了,接下来我们来做一个简单的测试,这里再次给出源码地址,在readme中,给出了详细的编译步骤,也给出了一键编译脚本,感兴趣的伙伴们可以拉下来试试。
在这里插入图片描述

  • 编译结束以后,我们启动两个服务端6000 6002 在nginx配置的两个
    在这里插入图片描述

  • 然后开启三个客户端,记得打开8000端口
    在这里插入图片描述
    在这里插入图片描述

  • 此时客户端,分配给了两个服务端
    在这里插入图片描述
    在这里插入图片描述

  • 进入一个终端,我们查看表里的内容
    在这里插入图片描述

  • 注册三个用户
    在这里插入图片描述

  • 表中数据,1 2是之前创建过的
    在这里插入图片描述

  • 登录
    在这里插入图片描述

  • 一对一聊天 3向4聊天,4不在线
    在这里插入图片描述

  • 查看离线消息
    在这里插入图片描述

  • 添加好友 3添加4
    在这里插入图片描述

  • 查看好友列表
    在这里插入图片描述

  • 1创建群
    在这里插入图片描述
    在这里插入图片描述

  • 4登录,显示离线消息
    在这里插入图片描述

  • 加入群
    在这里插入图片描述

  • 查看表
    在这里插入图片描述

  • 5登录,加入群

  • 群聊天
    在这里插入图片描述

  • 3与4聊天
    在这里插入图片描述

  • 3客户端退出
    在这里插入图片描述
    在这里插入图片描述

  • 服务端退出【ctrl+c】
    在这里插入图片描述

好了~ 关于集群聊天服务器的剖析就到此结束了,希望能够帮助到大家,也希望路过的大佬看到问题可以指出,感谢大家的支持,完结撒花~

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/652014.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

滑动窗口-java

主要通过单调队列来解决滑动窗口问题&#xff0c;得到滑动窗口中元素的最大值和最小值。 目录 前言 一、滑动窗口 二、算法思路 1.滑动窗口 2.算法思路 3.代码详解 三、代码如下 1.代码如下 2.读入数据 3.代码运行结果 总结 前言 主要通过单调队列来解决滑动窗口问题&#xff…

文件上传漏洞:pikachu靶场中的文件上传漏洞通关

目录 1、文件上传漏洞介绍 2、pikachu-client check 3、pikachu-MIME type 4、pikachu-getimagesize 最近在学习文件上传漏洞&#xff0c;这里使用pikachu靶场来对文件上传漏洞进行一个复习练习 废话不多说&#xff0c;开整 1、文件上传漏洞介绍 pikachu靶场是这样介绍文…

Docker快速安装SQL Server 2022

说明&#xff1a; 系统&#xff1a;Ubuntu 24.04 LTS 拉取SQL Server Docker镜像 docker pull mcr.microsoft.com/mssql/server:2022-CU12-ubuntu-22.04创建数据目录 sudo mkdir /var/mssql_data sudo chmod 777 /var/mssql_data说明&#xff1a; 权限设置为777&#xff0…

[集群聊天服务器]----(十)Nginx的tcp负载均衡配置--附带截图

接着上文&#xff0c;我们剖析了服务端和客户端的代码&#xff0c;但是单台服务器的并发量是有限的&#xff0c;面对并发量的要求&#xff0c;我们就需要引入Nginx来实现并发量的要求&#xff0c;将用户请求分发到不同的服务器上分担压力&#xff0c;这就是负载均衡。 选择负…

最新php项目加密源码

压缩包里有多少个php就会被加密多少个PHP、php无需安装任何插件。源码全开源 如果上传的压缩包里有子文件夹&#xff08;子文件夹里的php文件也会被加密&#xff09;&#xff0c;加密后的压缩包需要先修复一下&#xff0c;步骤&#xff1a;打开压缩包 》 工具 》 修复压缩文件…

JavaSE——集合框架二(2/6)-综合案例-斗地主游戏(做牌、洗牌、发牌、排序、看牌)

目录 需求与分析 具体实现 牌类定义 房间类定义 初步测试 启动游戏 运行案例 需求与分析 需求 总共有54张牌点数&#xff1a;"3","4","5","6","7","8","9","10","J",&qu…

MyBatis的基础操作

目录 一.什么是MyBatis? 二.使用MyBatis的准备工作 1.引入依赖: 2.配置数据库连接字符串(建立MaBatis和MySQL的连接) 3.在model包中建立数据库对应的实体类UserInfo 三.通过注解的方式实现MyBatis的开发 1.插入语句(Insert) 2.删除语句(Delete) 3.更新语句(Update) 4…

驱动开发:内核MDL读写进程内存

100编程书屋_孔夫子旧书网 MDL内存读写是最常用的一种读写模式,通常需要附加到指定进程空间内然后调用内存拷贝得到对端内存中的数据,在调用结束后再将其空间释放掉,通过这种方式实现内存读写操作,此种模式的读写操作也是最推荐使用的相比于CR3切换来说,此方式更稳定并不会…

c语言--结构体

前言 欢迎来到我的博客 个人主页:北岭敲键盘的荒漠猫-CSDN博客 结构体概念简介 c语言数组是一些相同类型的数据的集合。 这个结构体就是一些可以是不同类型的集合。 比如描述班里的一个人&#xff0c;他可能需要名字(字符串),也需要年龄(整数)。 这种情况就需要用结构体。 …

【Django】中间件实现钩子函数预处理和后处理,局部装饰视图函数

在app文件夹里新建middleware.py继承MiddlewareMixin&#xff0c; 编写中间件类&#xff0c;重写process_request、process_response钩子函数 from django.http import HttpRequest, HttpResponse from django.utils.decorators import decorator_from_middleware from django…

使用 Django 与 Redis 实现缓存优化

文章目录 什么是Redis&#xff1f;为什么选择Django与Redis&#xff1f;如何在Django中使用Redis&#xff1f;总结与拓展 在Web开发中&#xff0c;性能优化是一个至关重要的方面。而使用缓存是提高Web应用性能的常见方法之一。在这篇文章中&#xff0c;我们将探讨如何结合Djang…

layui扩展件(xm-select)实现下拉框

layui扩展件&#xff08;xm-select&#xff09;实现下拉框 扩展组件 xm-select 效果图 html代码 <div class"layui-inline"><label class"layui-form-label">职位</label><div class"layui-input-inline" style"wid…

Linux 使用 yum安装 ELK服务,yum 安装elasticsearch和Kibana(未写完)

文章目录 环境准备ELK组件介绍安装Elasticsearch安装Kibana 丢弃下载ELK 服务安装包Elasticsearch安装 Tips:关闭elasticsearch https 环境准备 ELK组件介绍 ElasticSearch &#xff1a; 是一个近实时&#xff08;NRT&#xff09;的分布式搜索和分析引擎&#xff0c;它可以用…

Chrome谷歌浏览器如何打开不安全页面的禁止权限?

目录 一、背景二、如何打开不安全页面被禁止的权限&#xff1f;2.1 第一步&#xff0c;添加信任站点2.2 第二步&#xff0c;打开不安全页面的权限2.3 结果展示 一、背景 在开发过程中&#xff0c;由于测试环境没有配置 HTTPS 请求&#xff0c;所以谷歌浏览器的地址栏会有这样一…

K8S-pod资源 探针

一.pod资源限制&#xff1a; 对pod资源限制原因&#xff1a;高并发占用所有的cpu资源、内存资源、会造成雪崩 当定义 Pod 时可以选择性地为每个容器设定所需要的资源数量。 最常见的可设定资源是 CPU 和内存大小&#xff0c;以及其他类型的资源。 方式&#xff1a; 对pod做…

金锋关晓柔短视频:成都鼎茂宏升文化传媒公司

金锋关晓柔短视频&#xff1a;情感与创意的交织 在短视频的浪潮中&#xff0c;无数创作者凭借独特的视角和创意脱颖而出。其中&#xff0c;金锋和关晓柔共同打造的短视频系列以其深厚的情感内涵和精湛的创意表达&#xff0c;成都鼎茂宏升文化传媒公司吸引了大量观众的关注&…

leetcode 1241每个帖子的评论数(postgresql)

需求 编写 SQL 语句以查找每个帖子的评论数。 结果表应包含帖子的 post_id 和对应的评论数 number_of_comments 并且按 post_id 升序排列。 Submissions 可能包含重复的评论。您应该计算每个帖子的唯一评论数。 Submissions 可能包含重复的帖子。您应该将它们视为一个帖子。…

C语言常用字符串处理函数

C语言中包含了很多对字符串处理的函数,要使用这些函数&#xff0c; 首先需要导入头文件#include <string.h> 1. strlen() -- 计算字符串长度 原型: size_t strlen(char const *string); 例: char *str "abcde"; size_t len strlen(str); // 结果为…

应用程序中的会话管理和Cookie安全指南

应用程序中的会话管理和Cookie安全指南 在现代应用程序中&#xff0c;会话管理和Cookie安全是确保用户信息和数据安全的重要组成部分。本文将详细介绍会话管理的最佳实践以及如何通过安全的Cookie设置来保护会话ID的交换。 单点登录&#xff08;SSO&#xff09;及会话管理机制…

Ubuntu24.04安装tabby-terminal-1.0.207并处理依赖

1 下载 tabby-terminal-1.0.207 地址&#xff1a; https://github.com/Eugeny/tabby/releases 点击show all 36 assets 选择 tabby-1.0.207-linux-x64.deb 并下载。 2 依赖下载 gconf2_3.2.6-3ubuntu6_amd64.deb gconf2-common_3.2.6-3ubuntu6_all.deb gconf-service_3.2.6-…