本节来实现C++集群聊天服务器项目中的点对点聊天业务,一起来试试吧
一、点对点聊天业务
聊天服务器中一个重要的功能就是实现点对点聊天,客户端发送的信息包含聊天业务msgid、自身
的id和姓名、聊天对象的id号以及聊天信息,例如:
{"msgid":5,"id":13,"name":"zhang san","to":15,"msg":"Hello,Boy!"}
id号为13的张三用户要与id号为15的用户进行 点对点聊天业务,发送消息为Hello,Boy!
如果要聊天,那便是双方的
{"msgid":5,"id":15,"name":"lisi","to":13,"msg":"Hello,Hello."}
id号为15的李四用户与id号为13的用户聊天,发送消息为Hello,Hello.
二、点对点业务步骤
(1)从json传来的数据中,获取聊天对象的id号
int to_id = js["to"].get<int>();
(2)点对点聊天,必须双方在线,如果不在线,则发送离线消息。因此需判断聊天对象是否在线,即是否在存储用户的通信连接的哈希表_userConnMap中,遍历哈希表查找在线用户id是否为聊天对象id,这里需保证线程安全,加锁
lock_guard<mutex>lock(_connMutex);
auto it = _userConnMap.find(to_id);
<1>如果聊天用户在线
服务器作为中转,主动推送消息给聊天对象用户
if(it!=_userConnMap.end()){
//to_id在线,转发消息,服务器主动推动消息给to_id用户
it->second->send(js.dump());
return ;
}
<2>如果聊天用户不在线,转而处理离线消息
底层数据库中,有一张offlinemessage表存储了离线消息
与处理user表类似,我们定义offlineMsgModel类处理离线消息业务,实现插入离线消息、删除离线消息、查询离线消息功能
class offlineMsgModel{
public:
//存储用户的离线消息
void insert(int userid,string msg);
//删除用户的离线消息
void remove(int userid);
//查询用户的离线消息
vector<string> query(int userid);
};
因此,当点对点聊天发现聊天对象不在线时,我们将json对象序列化后存入底层数据库中
因此,存储离线消息函数如下:
// 存储用户的离线消息
void offlineMsgModel::insert(int userid, string msg)
{
char sql[1024] = {0};
sprintf(sql, "insert into offlinemessage values(%d, '%s')", userid, msg.c_str());
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
(3)第(八)节用户登录模块,用户登录后需查看是否有对应的离线消息,如果有服务器就将数据库中存储的离线消息推送给相应的客户端,然后删除库中存储的离线消息
<1>查看用户是否有离线消息
实现offlineMsgModel的查询函数,用一个vector容器来接收字符串消息
// 查询用户的离线消息
vector<string> offlineMsgModel::query(int userid)
{
char sql[1024] = {0};
sprintf(sql, "select message from offlinemessage where userid = %d", userid);
vector<string> vec;
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
// 把userid用户的所有离线消息放入vec中返回
MYSQL_ROW row;
while((row = mysql_fetch_row(res)) != nullptr)
{
vec.push_back(row[0]);
}
mysql_free_result(res);
return vec;
}
}
return vec;
}
(2)如果查询离线消息不为空,在响应json对象添加离线消息,然后将底层数据库中离线消息清除
// 删除用户的离线消息
void offlineMsgModel::remove(int userid)
{
char sql[1024] = {0};
sprintf(sql, "delete from offlinemessage where userid = %d", userid);
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
(3)回调返回json字符串,登录业务中查看离线消息模块如下:
// 查询该用户是否有离线消息
vector<string> vec = _offlineMsgModel.query(id);
if (!vec.empty())
{
response["offlinemsg"] = vec;
// 读取该用户的离线消息后,把该用户的所有离线消息删除掉
_offlineMsgModel.remove(id);
}
conn->send(response.dump()); // 回调 ,返回json字符串
三、点对点业务代码实现
3.1offlineMsgModel类
在include/server/model中创建offlinemessagemodel.hpp头文件
#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H
#include <string>
#include <vector>
using namespace std;
//提供离线消息表的操作接口方法
class offlineMsgModel{
public:
//存储用户的离线消息
void insert(int userid,string msg);
//删除用户的离线消息
void remove(int userid);
//查询用户的离线消息
vector<string> query(int userid);
};
#endif
3.2 在src/server/model中创建offlinemessagemodel.cpp进行实现
#include "offlinemessagemodel.hpp"
#include "db.hpp"
// 存储用户的离线消息
void offlineMsgModel::insert(int userid, string msg)
{
char sql[1024] = {0};
sprintf(sql, "insert into offlinemessage values(%d, '%s')", userid, msg.c_str());
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
// 删除用户的离线消息
void offlineMsgModel::remove(int userid)
{
char sql[1024] = {0};
sprintf(sql, "delete from offlinemessage where userid = %d", userid);
MySQL mysql;
if (mysql.connect())
{
mysql.update(sql);
}
}
// 查询用户的离线消息
vector<string> offlineMsgModel::query(int userid)
{
char sql[1024] = {0};
sprintf(sql, "select message from offlinemessage where userid = %d", userid);
vector<string> vec;
MySQL mysql;
if (mysql.connect())
{
MYSQL_RES *res = mysql.query(sql);
if (res != nullptr)
{
// 把userid用户的所有离线消息放入vec中返回
MYSQL_ROW row;
while((row = mysql_fetch_row(res)) != nullptr)
{
vec.push_back(row[0]);
}
mysql_free_result(res);
return vec;
}
}
return vec;
}
3.2 publi.hpp聊天消息
#ifndef PUBLIC_H
#define PUBLIC_H
/*
server和client的公共文件
*/
enum EnMsgType
{
LOGIN_MSG = 1, //登录消息
LOGIN_MSG_ACK, //登录响应消息
REG_MSG, //注册消息
REG_MSG_ACK, //注册响应消息
ONE_CHAT_MSG, //聊天消息
};
#endif
3.3 处理用户点对点聊天业务
在chatservice.hpp中创建离线消息操作对象,创建处理点对点聊天业务函数oneChat
#ifndef CHATSERVICE_H
#define CHATSERVICE_H
#include <muduo/net/TcpConnection.h>
#include <unordered_map>//一个消息ID映射一个事件处理
#include <functional>
#include <mutex>
using namespace std;
using namespace muduo;
using namespace muduo::net;
#include "usermodel.hpp"
#include "offlinemessagemodel.hpp"
#include "json.hpp"
using json = nlohmann::json;
//表示处理消息的事件回调方法类型,事件处理器,派发3个东西
using MsgHandler = std::function<void(const TcpConnectionPtr &conn, json &js, Timestamp)>;
//聊天服务器业务类
class ChatService
{
public:
//获取单例对象的接口函数
static ChatService *instance();
//处理登录业务
void login(const TcpConnectionPtr &conn, json &js, Timestamp time);
//处理注册业务
void reg(const TcpConnectionPtr &conn, json &js, Timestamp time);
//处理点对点聊天消息
void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
//处理客户端异常退出
void clientCloseException(const TcpConnectionPtr &conn);
//获取消息对应的处理器
MsgHandler getHandler(int msgid);
private:
ChatService();//单例
//存储消息id和其对应的业务处理方法,消息处理器的一个表,写消息id对应的处理操作
unordered_map<int, MsgHandler> _msgHandlerMap;
//存储用户的通信连接
unordered_map<int,TcpConnectionPtr> _userConnMap;
//定义互斥锁,保证_userConnMap的线程安全
mutex _connMutex;
//数据操作类对象
UserModel _userModel;
offlineMsgModel _offlineMsgModel;
};
#endif
在chatservice.cpp中进行实现
#include "chatservice.hpp"
#include "public.hpp"
#include <muduo/base/Logging.h> //muduo的日志
using namespace std;
using namespace muduo;
// 获取单例对象的接口函数
ChatService *ChatService::instance()
{
static ChatService service;
return &service;
}
// 构造方法,注册消息以及对应的Handler回调操作
ChatService::ChatService()
{
// 用户基本业务管理相关事件处理回调注册
_msgHandlerMap.insert({LOGIN_MSG, std::bind(&ChatService::login, this, _1, _2, _3)});
_msgHandlerMap.insert({REG_MSG, std::bind(&ChatService::reg, this, _1, _2, _3)});
_msgHandlerMap.insert({ONE_CHAT_MSG,std::bind(&ChatService::oneChat,this,_1,_2,_3)});
}
// 获取消息对应的处理器
MsgHandler ChatService::getHandler(int msgid)
{
// 记录错误日志,msgid没有对应的事件处理回调
auto it = _msgHandlerMap.find(msgid);
if (it == _msgHandlerMap.end()) // 找不到
{
// 返回一个默认的处理器,空操作,=按值获取
return [=](const TcpConnectionPtr &conn, json &js, Timestamp)
{
LOG_ERROR << "msgid:" << msgid << " can not find handler!"; // muduo日志会自动输出endl
};
}
else // 成功的话
{
return _msgHandlerMap[msgid]; // 返回这个处理器
}
}
// 处理登录业务 id pwd pwd
void ChatService::login(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
int id = js["id"].get<int>();
string pwd = js["password"];
User user = _userModel.query(id);
if (user.getId() == id && user.getPwd() == pwd)
{
if (user.getState() == "online")
{
// 该用户已经登录,不允许重复登录
json response;
response["msgid"] = LOGIN_MSG_ACK;
response["errno"] = 2;
response["errmsg"] = "该账号已经登录,请重新输入新账号";
conn->send(response.dump()); // 回调 ,返回json字符串
}
else
{
{
// 登陆成功,记录用户连接
lock_guard<mutex> lock(_connMutex);
_userConnMap.insert(make_pair(id, conn));
}
// 登录成功,更新用户状态信息
user.setState("online");
_userModel.updateState(user);
json response;
response["msgid"] = LOGIN_MSG_ACK;
response["errno"] = 0;
response["errmsg"] = "登录成功!";
response["id"] = user.getId();
response["name"] = user.getName();
// 查询该用户是否有离线消息
vector<string> vec = _offlineMsgModel.query(id);
if (!vec.empty())
{
response["offlinemsg"] = vec;
// 读取该用户的离线消息后,把该用户的所有离线消息删除掉
_offlineMsgModel.remove(id);
}
conn->send(response.dump()); // 回调 ,返回json字符串
}
}
else
{
// 用户名或者密码错误
json response;
response["msgid"] = LOGIN_MSG_ACK;
response["errno"] = 1;
response["errmsg"] = "用户名或者密码错误";
conn->send(response.dump()); // 回调 ,返回json字符串
}
}
// 处理注册业务 name password
void ChatService::reg(const TcpConnectionPtr &conn, json &js, Timestamp time)
{
string name = js["name"]; // 获取名字
string pwd = js["password"]; // 获取密码
User user; // 创建用户对象
user.setName(name);
user.setPwd(pwd);
bool state = _userModel.insert(user); // 新用户的插入
if (state) // 插入成功
{
// 注册成功
json response;
response["msgid"] = REG_MSG_ACK;
response["errno"] = 0;
response["id"] = user.getId();
conn->send(response.dump()); // 回调 ,返回json字符串
}
else // 插入失败
{
// 注册失败
json response;
response["msgid"] = REG_MSG_ACK;
response["errno"] = 1;
conn->send(response.dump()); // 回调 ,返回json字符串
}
}
// 处理客户端异常退出
void ChatService::clientCloseException(const TcpConnectionPtr &conn)
{
User user;
{
lock_guard<mutex> lock(_connMutex);
for (auto it = _userConnMap.begin(); it != _userConnMap.end(); it++)
{
if (it->second == conn)
{
user.setId(it->first);
// 从map表中删除用户的连接信息
_userConnMap.erase(it);
break;
}
}
}
// 更新用户的状态信息
if (user.getId() != -1)
{
user.setState("offline");
_userModel.updateState(user);
}
}
//处理点对点聊天消息
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time){
int to_id = js["to"].get<int>();
{
lock_guard<mutex>lock(_connMutex);
auto it = _userConnMap.find(to_id);
if(it!=_userConnMap.end()){
//to_id在线,转发消息,服务器主动推动消息给to_id用户
it->second->send(js.dump());
return ;
}
}
//to_id不在线,处理离线消息
_offlineMsgModel.insert(to_id,js.dump());
}
三、功能验证
分别使用张三和李四的账号登录聊天服务器
可以看到张三和李四登录成功,数据库显示在线
两人分别互通消息,验证点对点聊天业务是否成功实现
点对点聊天业务验证成功
两人分别离线,异常退出,服务器记录
张三再次上线,给离线的李四发消息
张三问李四How are you!
此时李四不在线,存储入离线消息数据库中
李四登录,可以看到张三发来的离线消息已成功推送,此外底层数据库离线表也清空!
点对点聊天业务功能验证完毕!
感兴趣的小伙伴一起来试一下吧~
如果有问题还请及时联系我哦,感谢~
四、项目流程
1、项目环境搭建
C++项目——集群聊天服务器项目(一)项目介绍、环境搭建、Boost库安装、Muduo库安装、Linux与vscode配置_c++集群聊天服务器-CSDN博客
2、Json第三方库介绍
C++项目——集群聊天服务器项目(二)Json第三方库-CSDN博客
3、muduo网络库介绍
C++项目——集群聊天服务器项目(三)muduo网络库-CSDN博客
4、MySQL数据库创建
C++项目——集群聊天服务器项目(四)MySQL数据库-CSDN博客
5、网络模块与业务模块代码编写
C++项目——集群聊天服务器项目(五)网络模块与业务模块-CSDN博客
6、MySQL模块编写
C++项目——集群聊天服务器项目(六)MySQL模块-CSDN博客
7、Model层设计、注册业务实现
C++项目——集群聊天服务器项目(七)Model层设计、注册业务实现-CSDN博客
8、用户登录业务
C++项目——集群聊天服务器项目(八)用户登录业务-CSDN博客
9、客户端异常退出业务
C++项目——集群聊天服务器项目(九)客户端异常退出业务-CSDN博客