文章目录
- 写在前面
- 环境安装
- Protubuf
- 基本介绍
- Muduo
- 基本介绍
- SQLite3
- 异步操作实现线程池
本篇是对于一个仿RabbitMQ实现的消息队列项目的前置知识的说明文档
写在前面
环境安装
Protubuf
基本介绍
项目所需要的比较重要模块有Protubuf模块,那么下面先对于这个模块进行简单的介绍
Protubuf全称是Protocol Buffer,是一个数据结构序列化和反序列化的框架
基本的调用逻辑为,用户首先可以自定义一个proto文件,之后就会编译生成一些对应的文件,在这些文件中,就会包含有对应massage的一系列接口方法,比如说有序列化和反序列化的方法,而未来实际的业务处理代码就会依赖这些方法,进行一些对应的处理
首先定义一个proto文件
syntax = "proto3";
package contacts;
message contact {
uint64 number = 1;
float score = 2;
string name = 3;
}
接着运行Protubuf的命令,生成对应的配置文件:
protoc --cpp_out=. contacts.proto
此时就会生成两个文件,一个是 .cc 文件,一个是 .h 文件,里面存放的内容当中就有我们需要的序列化和反序列化的头文件:
依据这个信息,就可以简单的实现一个序列化和反序列化的样例:
#include "contacts.pb.h"
using namespace std;
int main()
{
contacts::contact conn;
// 先把信息写进去
conn.set_name("小明");
conn.set_number(10001);
conn.set_score(80.5);
// 序列化一下
string str = conn.SerializeAsString();
cout << str << endl;
// 反序列化出来
contacts::contact resconn;
resconn.ParseFromString(str);
cout << resconn.name() << endl;
cout << resconn.number() << endl;
cout << resconn.score() << endl;
return 0;
}
如上所示是一个基本的逻辑,运行结果:
test@VM-16-11-ubuntu:~/message-queue/demo/proto$ ./main
# 这里是乱码是正常的,因为序列化出的结果应该是一个二进制文件
�N�B小明
小明
10001
80.5
由此可见也确实正确的处理出了结果信息
Muduo
基本介绍
这是一个基于Reactor实现的高并发服务器,具体的实现就不在这里体现了,我之前的一个项目做的正是这个,这里就贴个链接吧
SQLite3
这是一个轻量级的数据库,只有本地端,对比MySQL来说更适合当前的项目场景,这里就选择它来当做数据库,下面演示其基本用法
封装实现一个SqliteHelper类,提供简单的sqlite数据库操作接口,完成数据的基础增删改查操作。
- 创建/打开数据库文件
- 针对打开的数据库执行操作
- 表的操作
- 数据的操作
- 关闭数据库
#include <iostream>
#include <string>
#include <sqlite3.h>
using namespace std;
class SqliteHelper
{
public:
typedef int(*SqliteCallback)(void*,int,char**,char**);
SqliteHelper(const string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX)
{
//int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);
if (ret != SQLITE_OK) {
cout << "创建/打开sqlite数据库失败: ";
cout << sqlite3_errmsg(_handler) << endl;
return false;
}
return true;
}
bool exec(const string &sql, SqliteCallback cb, void *arg)
{
//int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)
int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
if (ret != SQLITE_OK)
{
cout << sql << endl;
cout << "执行语句失败: ";
cout << sqlite3_errmsg(_handler) << endl;
return false;
}
return true;
}
void close()
{
//int sqlite3_close_v2(sqlite3*);
if (_handler) sqlite3_close_v2(_handler);
}
private:
string _dbfile;
sqlite3 *_handler;
};
异步操作实现线程池
future
在C++11标准库中,提供了一个future的模板类,它表示的是一个异步操作的结果,当在多线程编程中使用异步任务的时候,使用这个类可以帮助在需要的时候获取到对应的数据处理结果,而future类本质上的一个重要特性是可以阻塞当前线程,直到异步操作完成,而确保在获取结果的时候不会出现结果未完成的情况出现
下面来看官方文档对于这个类的讲述:
大体意思是,future会作为一个返回值来进行接收,可以通过下面的三种方式来进行调用,这里展示第一种:async
async
在这个函数当中,就是一个经典的调用异步操作来执行的操作,对于函数参数来说,Fn表示的是这是一个要执行的函数,后面的args表示的是这个函数的参数,而对于这个函数来说,它存在一种函数的重载,这个函数的重载可以在最前面加上一个调用的策略,可以使得是立刻进行执行和获取函数的返回值,或者是在调用get函数再进行函数返回值的获取,下面使用一个实例代码来进行演示
#include <chrono>
#include <iostream>
#include <future>
#include <thread>
using namespace std;
// 模拟一个加法的环境
int add(int num1, int num2)
{
cout << "加法" << endl;
return num1 + num2;
}
void deferred_solve()
{
cout << "deferred" << endl;
cout << "------1------" << endl;
future<int> fut = async(launch::deferred, add, 10, 20);
cout << "------2------" << endl;
this_thread::sleep_for(chrono::seconds(1));
cout << "------3------" << endl;
int res = fut.get();
cout << "------4------" << endl;
cout << "运行结果" << res << endl;
}
void async_solve()
{
cout << "async" << endl;
cout << "------1------" << endl;
future<int> fut = async(launch::async, add, 10, 20);
cout << "------2------" << endl;
this_thread::sleep_for(chrono::seconds(1));
cout << "------3------" << endl;
int res = fut.get();
cout << "------4------" << endl;
cout << "运行结果" << res << endl;
}
int main()
{
cout << "deferred: " << endl;
deferred_solve();
cout << endl;
cout << "async: " << endl;
async_solve();
cout << endl;
return 0;
}
运行结果如下所示:
deferred:
deferred
------1------
------2------
------3------
加法
------4------
运行结果30
async:
async
------1------
------2------
加法
------3------
------4------
运行结果30
从上可以看出一些端倪,对于deferred这种策略来说,它的策略是在进行get方法的时候再进行资源的计算,而对于async这样的策略来说,更多的是在进行调用之后就会进行计算,在这种调用之后,会立刻再开一个工作线程把内容计算完毕后传递回主函数,这是两个基本的调用逻辑~
下面演示第二种获取异步结果的方式:promise
promise
#include <iostream>
#include <thread>
#include <future>
using namespace std;
void add(int num1, int num2, promise<int>& pro)
{
pro.set_value(num1 + num2);
return;
}
int main()
{
promise<int> pro;
future<int> fut = pro.get_future();
thread th(add, 10, 20, ref(pro));
int res = fut.get();
cout << "执行结果: " << res << endl;
th.join();
return 0;
}
这个场景本质上就是利用了一个promise对象来和future对象建立了关系,如果在获取future对象的时候并没有发生值改变,就会阻塞等待,保证了异步的基本进行
package task
下面说的是最后一种获取异步结果的方式:package task
对于这种调用的方式,可以把它生成的对象当成是一个可调用对象,下面演示其基本用法
#include <iostream>
#include <thread>
#include <future>
#include <memory>
using namespace std;
int add(int num1, int num2)
{
return num1 + num2;
}
int main()
{
auto ptask = make_shared<packaged_task<int(int, int)>>(add);
future<int> fut = ptask->get_future();
thread th([ptask](){
(*ptask)(10, 20);
});
int sum = fut.get();
cout << sum << endl;
th.join();
return 0;
}
C++11线程池实现
下面就基于上述的这三种当中的package来实现一个C++11线程池,其中会把相关的讲解和注释的信息放到代码片段中,这里直接展示对应的代码段
#include <features.h>
#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>
using namespace std;
class threadpool
{
using Functor = function<void(void)>;
public:
threadpool(int count = 1) : _stop(false)
{
for(int i = 0; i < count; i++)
_threads.emplace_back(&threadpool::entry, this);
}
~threadpool()
{
stop();
}
void stop()
{
if(_stop == true)
return;
_stop = true;
// 唤醒线程
_cv.notify_all();
// 回收线程
for(auto& thread: _threads)
thread.join();
}
// 对于push函数,传入的是一个用户要执行的函数,还有函数的参数
// push函数的内部,会把这些传入的函数和参数封装为一个packaged_task
// 然后使用lambda表达式生成一个可调用对象,放到任务池中,让工作线程取出执行
template<typename F, typename ...Args>
auto push(const F&& func, Args&& ...args) -> future<decltype(func(args...))>
{
// 1. 把传入的函数封装为一个packaged任务
// 把返回类型获取出来
using return_type = decltype(func(args...));
// 把函数对象和函数参数绑定到一起
auto tmp_func = bind(forward<F>(func), forward<Args>(args)...);
// 把整体的tmp_func绑定成一个任务
auto task = make_shared<packaged_task<return_type()>>(tmp_func);
future<return_type> fut = task->get_future();
// 2. 构造一个lambda匿名函数,函数内执行任务对象
{
unique_lock<mutex> lock(_mutex);
// 3. 把匿名函数对象放到任务池中
_taskpool.push_back([task](){ (*task)(); });
_cv.notify_one();
}
return fut;
}
private:
void entry()
{
while(!_stop)
{
vector<Functor> tmp_taskpool;
{
unique_lock<mutex> lock(_mutex);
_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });
tmp_taskpool.swap(_taskpool);
}
for(auto& task : tmp_taskpool)
task();
}
}
private:
atomic<bool> _stop;
vector<Functor> _taskpool;
mutex _mutex;
condition_variable _cv;
vector<thread> _threads;
};
int add(int num1, int num2)
{
return num1 + num2;
}
int main()
{
threadpool pool;
for(int i = 0; i < 10; i++)
{
future<int> fut = pool.push(add, 10, i);
cout << fut.get() << endl;
}
return 0;
}