本次实现的Redis连接池是一个单例且多线程安全的连接池。
主要实现的功能为:读取配置,将配置中对应建立redis连接并加入到连接池中,然后从连接池中取出连接使用。每当配置进行修改,重新往池子中加入连接。
- 通用类
实现一些基础都会使用的接口,定义宏等。
有些类是利用RAII思想,构造时获得对象,初始化锁,加锁等,析构时释放对象,释放锁资源,释放锁等。
#pragma once
#include <iostream>
#include "../../hiredis/hiredis/hiredis.h"
#include <pthread.h>
#include <cstring>
#include <algorithm>
#include <map>
#define SAFE_DELETE(x) {if (x != nullptr){ delete x, (x) = nullptr; }}
class AutoReply
{
public:
AutoReply(redisReply* reply)
{
reply_ = nullptr;
if(reply)
{
reply_ = reply;
}
}
~AutoReply()
{
if(reply_)
{
freeReplyObject(reply_);
reply_ = nullptr;
}
}
redisReply* get()
{
return reply_;
}
bool isErr()
{
if (!reply_)
return true;
if (reply_->type == REDIS_REPLY_ERROR)
{
std::cout << "reply error " << reply_->str << std::endl;
return true;
}
if (reply_->type == REDIS_REPLY_STATUS)
{
std::string tmp_str;
//大写转小写
//std::transform(reply_->str, reply_->str + strlen(reply_->str), tmp_str.begin(), tolower);
for(int i = 0; i < strlen(reply_->str); ++i)
{
char c = reply_->str[i];
if(isupper(reply_->str[i]))
{
c = tolower(c);
}
tmp_str += tolower(c);
}
//std::cout << "iserr str" << tmp_str.c_str() << std::endl;
if (strcmp(tmp_str.c_str(), "ok") != 0)
{
std::cout << "reply statue not ok " << tmp_str << "replystr: " << reply_->str << std::endl;
return true;
}
}
return false;
}
bool isPingErr()
{
if (!reply_)
return true;
if (reply_->type == REDIS_REPLY_ERROR)
{
std::cout << "reply error " << reply_->str << std::endl;
return true;
}
if (reply_->type == REDIS_REPLY_STATUS)
{
std::string tmp_str;
//大写转小写
std::transform(reply_->str, reply_->str + strlen(reply_->str), tmp_str.begin(), tolower);
//std::cout << "isPingErr str" << tmp_str.c_str() << std::endl;
if (strcmp(tmp_str.c_str(), "pong") != 0)
{
std::cout << "reply statue not pong " << tmp_str << std::endl;
return true;
}
}
return false;
}
AutoReply(const AutoReply& ar)
{
reply_ = ar.reply_;
}
private:
AutoReply operator=(const AutoReply& ar);
redisReply* reply_;
};
class qMutex
{
public:
qMutex()
{
pthread_mutex_init(&mt_, nullptr);
}
~qMutex()
{
pthread_mutex_destroy(&mt_);
}
void lock()
{
pthread_mutex_lock(&mt_);
}
void unlock()
{
pthread_mutex_unlock(&mt_);
}
private:
qMutex(const qMutex& qmt);
qMutex operator=(const qMutex& qmt);
pthread_mutex_t mt_;
};
class AutoMutex
{
public:
AutoMutex(qMutex& mutex):mutex_(mutex)
{
mutex_.lock();
}
~AutoMutex()
{
mutex_.unlock();
}
private:
//AutoMutex(const AutoMutex& am);
AutoMutex& operator=(const AutoMutex& am);
qMutex& mutex_;
};
- 操作redis类
主要是连接redis,操作redis,有密码需要使用auth命令验证密码。
#pragma once
#include "common.h"
enum ConnectState {
CONNECTSTATE_NONE,
CONNECTSTATE_CONN,//已连接
CONNECTSTATE_UNCONN,//断开连接
};
class redisConn{
public:
redisConn(size_t id, std::string ip = "127.0.0.1", size_t port = 6379, std::string passwd = "123456");
~redisConn();
bool connect();
bool connectUnblock();
bool disConnect();
bool isRun();
bool auth();
bool ping();
bool reconnect();
//断线重连
void resetReconnectTime() \
{
AutoMutex mt(locker_);
reconnect_times_ = 0;
}
bool keepAlive();
bool set(const char* key, const char* val);
std::string get(const char* key);
private:
size_t id_;
std::string ip_;
size_t port_;
std::string passwd_;
ConnectState state_;//连接状态
size_t reconnect_times_;//连接次数
redisContext* context_;
redisReply* reply_;
qMutex locker_;//为了保证线程安全
};
#include "redisConn.h"
redisConn::redisConn(size_t id, std::string ip, size_t port, std::string passwd)
{
id_ = id;
ip_ = ip;
port_ = port;
passwd_ = passwd;
state_ = CONNECTSTATE_NONE;
context_ = nullptr;
}
redisConn::~redisConn()
{
disConnect();
}
bool redisConn::connect()
{
if (context_ != nullptr)
{
redisFree(context_);
context_ = nullptr;
}
context_ = redisConnect(ip_.c_str(), port_);
if (!context_)
{
std::cerr << "connect fail" << std::endl;
return false;
}
if(context_->err)
{
std::cerr << "connect err " << context_->errstr << std::endl;
redisFree(context_);
context_ = nullptr;
return false;
}
state_ = CONNECTSTATE_CONN;
std::cout << "connect succ" << std::endl;
//验证密码
auth();
return true;
}
bool redisConn::connectUnblock()
{
if (context_)
{
redisFree(context_);
context_ = nullptr;
}
context_ = redisConnectNonBlock(ip_.c_str(), port_);
if(!context_)
{
std::cout << "connect fail" << std::endl;
return false;
}
if(context_->err)
{
std::cout << "connect err " << context_->errstr << std::endl;
redisFree(context_);
context_ = nullptr;
return false;
}
state_ = CONNECTSTATE_CONN;
std::cout << "conn no block succ" << std::endl;
//验证密码
//auth();
return true;
}
bool redisConn::disConnect()
{
AutoMutex mt(locker_);
if (context_)
{
redisFree(context_);
context_ = nullptr;
//SAFE_DELETE(context_);
}
state_ = CONNECTSTATE_UNCONN;
std::cout << "disConnect succ" << std::endl;
return true;
}
bool redisConn::isRun()
{
AutoMutex mt(locker_);
return state_ == CONNECTSTATE_CONN;
}
bool redisConn::auth()
{
AutoMutex mt(locker_);
AutoReply reply = (redisReply*)redisCommand(context_, "auth %s", passwd_.c_str());
if (reply.isErr())
return false;
std::cout << "auth succ" << std::endl;
return true;
}
bool redisConn::ping()
{
//检查服务器是否在运行
AutoMutex mt(locker_);
AutoReply reply = (redisReply*)redisCommand(context_, "ping");
if(reply.isPingErr())
return false;
return true;
}
bool redisConn::reconnect()
{
if(!connect() && !auth())
{
std::cout << "connect un block fail" << std::endl;
return false;
}
state_ = CONNECTSTATE_CONN;
std::cout << "reconnect succ" << std::endl;
return true;
}
bool redisConn::keepAlive()
{
if (state_ == CONNECTSTATE_CONN)
{
if(!ping())
{
if(!reconnect())
{
std::cout << "断线" << ip_ << ":" << port_ << std::endl;
state_ = CONNECTSTATE_UNCONN;
return false;
}
return true;
}
else if(state_ == CONNECTSTATE_UNCONN)
{
//断线重连
if (reconnect_times_ < 10)
{
reconnect_times_++;
if(reconnect())
{
reconnect_times_ = 0;
state_ = CONNECTSTATE_CONN;
return true;
}
}
}
}
return false;
}
bool redisConn::set(const char* key, const char* val)
{
AutoMutex mt(locker_);
if(!context_)
return false;
AutoReply reply = (redisReply*)redisCommand(context_, "set %s %s", key, val);
if(reply.isErr())
{
return false;
}
return true;
}
std::string redisConn::get(const char* key)
{
AutoMutex mt(locker_);
if(!context_)
return "";
AutoReply reply = (redisReply*)redisCommand(context_, "get %s", key);
if(reply.isErr())
{
return "";
}
return reply.get()->str ? reply.get()->str : "";
}
- 单例泛型类
对应的连接池和读取配置只需要一个对象,可以实现为单例。
使用这个单例:
- 继承该单例类(继承单例接口),并且设为友元(由于构造,拷贝构造,赋值重载运算符设为私有,单例类需要使用)。
- 设为友元。
两种方式使用方法不同,看下面例子。
#pragma once
#include "common.h"
template<class T>
class Singleton
{
public:
static T* getMe()
{
if(single_ == nullptr)
{
mt_.lock();
if(single_ == nullptr)
{
single_ = new T;
}
mt_.unlock();
}
return single_;
}
static void delMe()
{
if(single_)
{
SAFE_DELETE(single_);
}
}
protected:
//需要写出protected,不然继承的构造无法调用
Singleton()
{
}
~Singleton()
{
delMe();
}
private:
//外部不允许赋值喝拷贝构造
Singleton(const Singleton& );
const Singleton& operator=(const Singleton& );
static T* single_;
static qMutex mt_;
};
template<class T>
T* Singleton<T>::single_ = nullptr;
template<class T>
qMutex Singleton<T>::mt_;
使用实例:
- 连接池类和读配置类
连接池主要实现了将连接加入到连接池中,和从连接池中拿出连接。
读取配置,配置主要为读取唯一标识——服务器ip——redis端口。如果需要和可以读取密码等。
连接池中的每一个连接有一个唯一标识,通过标识来获取对一个连接,标识可以是对应表冈功能(比如:如果是游戏中的排行榜,可以是不同的排行榜,也可以是不同玩家,每个玩家对应一个redis)。
#pragma once
#include "qSingleton.h"
#include "redisConn.h"
#include <fstream>
#include <cstring>
struct ConnectInfo
{
ConnectInfo()
{
state_ = 0;
}
redisConn* con_;
int state_;
};
class redisPool : public Singleton<redisPool>
{
friend class Singleton<redisPool>;
public:
bool addClient(size_t id, std::string ip, size_t port);
bool delClient();
bool keepAlive();
bool updateCfgConn();
redisConn* getClientByID(size_t id);
private:
redisPool();
~redisPool();
redisPool(const redisPool& );
const redisPool& operator=(const redisPool&);
ConnectInfo* getConnectInfo(size_t id);
typedef std::map<size_t, ConnectInfo> ConnectInfoT;
ConnectInfoT connect_;
};
struct RedisConnCfg
{
RedisConnCfg()
{
id_ = 0;
port_ = 0;
}
void print()
{
std::cout << id_ << ":" << ip_ << ":" << port_ << std::endl;
}
size_t id_;
std::string ip_;
size_t port_;
};
class RedisConnCfgMgr : public Singleton<RedisConnCfgMgr>
{
friend class Singleton<RedisConnCfgMgr>;
public:
bool loadCfg();
const std::vector<RedisConnCfg>& getRedisConnCfg(){ return cfg_; }
private:
RedisConnCfgMgr();
~RedisConnCfgMgr();
RedisConnCfgMgr(const RedisConnCfgMgr&);
const RedisConnCfgMgr& operator=(const RedisConnCfgMgr& c);
std::vector<RedisConnCfg> cfg_;
};
#include "redisPool.h"
redisPool::redisPool()
{
}
redisPool::~redisPool()
{
delClient();
}
bool redisPool::addClient(size_t id, std::string ip, size_t port)
{
ConnectInfo* pInfo = getConnectInfo(id);
if(!pInfo)
{
pInfo = &connect_[id];
pInfo->con_ = new redisConn(id, ip.c_str(), port, "123456");
pInfo->state_ = 1;
}
if(!pInfo->con_->isRun())
{
if(pInfo->con_->connect() && pInfo->con_->auth())
{
return true;
}
else
{
pInfo->con_->disConnect();
return false;
}
}
return true;
}
bool redisPool::delClient()
{
ConnectInfoT::iterator it = connect_.begin();
for(; it != connect_.end(); ++it)
{
it->second.con_->disConnect();
it->second.state_ = 0;
SAFE_DELETE(it->second.con_);
it->second.con_ = nullptr;
}
connect_.clear();
}
redisConn* redisPool::getClientByID(size_t id)
{
ConnectInfoT::iterator it = connect_.find(id);
if(it != connect_.end())
{
return it->second.con_;
}
return nullptr;
}
ConnectInfo* redisPool::getConnectInfo(size_t id)
{
ConnectInfoT::iterator it = connect_.find(id);
if(it != connect_.end())
{
return &it->second;
}
return nullptr;
}
bool redisPool::keepAlive()
{
ConnectInfoT::iterator it = connect_.begin();
for(; it != connect_.end(); ++it)
{
it->second.con_->keepAlive();
}
}
bool redisPool::updateCfgConn()
{
//重置连接状态
ConnectInfoT::iterator it = connect_.begin();
for(; it != connect_.end(); ++it)
{
it->second.state_ = 0;
}
//更新配置中的redis到redisPool中
const std::vector<RedisConnCfg>& cfg_vec = RedisConnCfgMgr::getMe()->getRedisConnCfg();
for(int i = 0; i < cfg_vec.size(); ++i)
{
//在连接池中进行重连
const RedisConnCfg& tmp = cfg_vec[i];
it = connect_.find(tmp.id_);
if(it != connect_.end())
{
it->second.con_->resetReconnectTime();
it->second.con_->keepAlive();
it->second.state_ = 1;
}
else
{
//不在进行添加并连接redis
addClient(tmp.id_, tmp.ip_, tmp.port_);
}
}
//删除掉未连接的redis
it = connect_.begin();
for(; it != connect_.end(); )
{
if(!it->second.state_)
{
it->second.con_->disConnect();
SAFE_DELETE(it->second.con_);
connect_.erase(it++);
}
else
{
it++;
}
}
return true;
}
RedisConnCfgMgr::RedisConnCfgMgr()
{
}
RedisConnCfgMgr::~RedisConnCfgMgr()
{
}
bool RedisConnCfgMgr::loadCfg()
{
std::ifstream infile("redisCfg.txt");
if(!infile.is_open())
{
std::cout << "file open fail" << std::endl;
return false;
}
std::vector<std::string> cfg_vec;
char buf[1024];
while(infile.getline(buf, sizeof(buf)))
{
//std::cout << buf << std::endl;
char* p = strtok(buf, "-");
while(p)
{
cfg_vec.push_back(p);
p = strtok(NULL, "-");
}
if(cfg_vec.size() >= 3)
{
RedisConnCfg tmp;
tmp.id_ = atoi(cfg_vec[0].c_str());
tmp.ip_ = cfg_vec[1];
tmp.port_ = atoi(cfg_vec[2].c_str());
tmp.print();
cfg_.push_back(tmp);
}
cfg_vec.clear();
}
infile.close();
return true;
}
测试:
#include "redisPool.h"
void* setHandleFunc(void* arg)
{
//std::cout << pthread_self() << std::endl;
RedisConnCfg* tmp = (RedisConnCfg*)arg;
redisConn* conn = redisPool::getMe()->getClientByID(tmp->id_);
if(conn)
{
char kbuf[64];
char vbuf[64];
snprintf(kbuf, sizeof(kbuf), "k%d", tmp->id_);
snprintf(vbuf, sizeof(vbuf), "v%ld", pthread_self());
//std::cout << kbuf << "---" << vbuf << std::endl;
conn->set(kbuf, vbuf);
}
return nullptr;
}
void* getHandleFunc(void* arg)
{
//std::cout << pthread_self() << std::endl;
RedisConnCfg* tmp = (RedisConnCfg*)arg;
redisConn* conn = redisPool::getMe()->getClientByID(tmp->id_);
if(conn)
{
char kbuf[64];
snprintf(kbuf, sizeof(kbuf), "k%d", tmp->id_);
std::string str = conn->get(kbuf);
std::cout << kbuf << ":" << str << std::endl;
}
return nullptr;
}
int main()
{
//加载配置
RedisConnCfgMgr::getMe()->loadCfg();
redisPool::getMe()->updateCfgConn();
std::vector<pthread_t> pt_vec;
const std::vector<RedisConnCfg>& cfg_vec = RedisConnCfgMgr::getMe()->getRedisConnCfg();
for(int i = 0; i < cfg_vec.size(); ++i)
{
//创建线程
pthread_t pid = 0;
pthread_create(&pid, nullptr, setHandleFunc, (void*)&cfg_vec[i]);
pt_vec.push_back(pid);
}
for(int i = 0; i < pt_vec.size(); ++i)
{
pthread_detach(pt_vec[i]);
}
pt_vec.clear();
for(int i = 0; i < cfg_vec.size(); ++i)
{
//创建线程
pthread_t pid = 0;
pthread_create(&pid, nullptr, getHandleFunc, (void*)&cfg_vec[i]);
pt_vec.push_back(pid);
}
for(int i = 0; i < pt_vec.size(); ++i)
{
pthread_detach(pt_vec[i]);
}
while(1)
{}
return 0;
}
int main()
{
RedisConnCfgMgr rm;
rm.loadCfg();
return 0;
}
Makefile编译文件:
主要需要下载hiredis库。
main:main.cpp redisPool.cpp redisConn.cpp
g++ -g $^ -o $@ -std=c++11 -lhiredis -lpthread
.PHONY:clean
clean:
rm -rf main