项目:消息队列的前置知识

文章目录

  • 写在前面
    • 环境安装
  • Protubuf
    • 基本介绍
  • Muduo
    • 基本介绍
  • SQLite3
  • 异步操作实现线程池

本篇是对于一个仿RabbitMQ实现的消息队列项目的前置知识的说明文档

写在前面

环境安装

Protubuf

基本介绍

项目所需要的比较重要模块有Protubuf模块,那么下面先对于这个模块进行简单的介绍

Protubuf全称是Protocol Buffer,是一个数据结构序列化和反序列化的框架

基本的调用逻辑为,用户首先可以自定义一个proto文件,之后就会编译生成一些对应的文件,在这些文件中,就会包含有对应massage的一系列接口方法,比如说有序列化和反序列化的方法,而未来实际的业务处理代码就会依赖这些方法,进行一些对应的处理

首先定义一个proto文件

syntax = "proto3";

package contacts;

message contact {
    uint64 number = 1;
    float score = 2;
    string name = 3;
}

接着运行Protubuf的命令,生成对应的配置文件:

protoc --cpp_out=. contacts.proto

此时就会生成两个文件,一个是 .cc 文件,一个是 .h 文件,里面存放的内容当中就有我们需要的序列化和反序列化的头文件:

在这里插入图片描述
依据这个信息,就可以简单的实现一个序列化和反序列化的样例:

#include "contacts.pb.h"
using namespace std;

int main()
{
    contacts::contact conn;

    // 先把信息写进去
    conn.set_name("小明");
    conn.set_number(10001);
    conn.set_score(80.5);

    // 序列化一下
    string str = conn.SerializeAsString();
    cout << str << endl;

    // 反序列化出来
    contacts::contact resconn;
    resconn.ParseFromString(str);
    cout << resconn.name() << endl;
    cout << resconn.number() << endl;
    cout << resconn.score() << endl;

    return 0;
}

如上所示是一个基本的逻辑,运行结果:

test@VM-16-11-ubuntu:~/message-queue/demo/proto$ ./main 
# 这里是乱码是正常的,因为序列化出的结果应该是一个二进制文件
�N�B小明
小明
10001
80.5

由此可见也确实正确的处理出了结果信息

Muduo

基本介绍

这是一个基于Reactor实现的高并发服务器,具体的实现就不在这里体现了,我之前的一个项目做的正是这个,这里就贴个链接吧

SQLite3

这是一个轻量级的数据库,只有本地端,对比MySQL来说更适合当前的项目场景,这里就选择它来当做数据库,下面演示其基本用法

封装实现一个SqliteHelper类,提供简单的sqlite数据库操作接口,完成数据的基础增删改查操作。

  1. 创建/打开数据库文件
  2. 针对打开的数据库执行操作
    1. 表的操作
    2. 数据的操作
  3. 关闭数据库
#include <iostream>
#include <string>
#include <sqlite3.h>
using namespace std;

class SqliteHelper 
{
public:
    typedef int(*SqliteCallback)(void*,int,char**,char**);
    SqliteHelper(const string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
    
    bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX) 
    {
        //int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs );
        int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | safe_leve, nullptr);
        if (ret != SQLITE_OK) {
            cout << "创建/打开sqlite数据库失败: ";
            cout << sqlite3_errmsg(_handler) << endl;
            return false;
        }
        return true;
    }

    bool exec(const string &sql, SqliteCallback cb, void *arg) 
    {
        //int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**), void* arg, char **err)
        int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
        if (ret != SQLITE_OK) 
        {
            cout << sql << endl;
            cout << "执行语句失败: ";
            cout << sqlite3_errmsg(_handler) << endl;
            return false;
        }
        return true;
    }
    void close() 
    {
        //int sqlite3_close_v2(sqlite3*);
        if (_handler) sqlite3_close_v2(_handler);
    }
private:
    string _dbfile;
    sqlite3 *_handler;
};

异步操作实现线程池

future

在C++11标准库中,提供了一个future的模板类,它表示的是一个异步操作的结果,当在多线程编程中使用异步任务的时候,使用这个类可以帮助在需要的时候获取到对应的数据处理结果,而future类本质上的一个重要特性是可以阻塞当前线程,直到异步操作完成,而确保在获取结果的时候不会出现结果未完成的情况出现

下面来看官方文档对于这个类的讲述:

在这里插入图片描述

大体意思是,future会作为一个返回值来进行接收,可以通过下面的三种方式来进行调用,这里展示第一种:async

async

在这里插入图片描述
在这个函数当中,就是一个经典的调用异步操作来执行的操作,对于函数参数来说,Fn表示的是这是一个要执行的函数,后面的args表示的是这个函数的参数,而对于这个函数来说,它存在一种函数的重载,这个函数的重载可以在最前面加上一个调用的策略,可以使得是立刻进行执行和获取函数的返回值,或者是在调用get函数再进行函数返回值的获取,下面使用一个实例代码来进行演示

#include <chrono>
#include <iostream>
#include <future>
#include <thread>
using namespace std;

// 模拟一个加法的环境
int add(int num1, int num2)
{
    cout << "加法" << endl;
    return num1 + num2;
}

void deferred_solve()
{
    cout << "deferred" << endl;
    cout << "------1------" << endl;
    future<int> fut = async(launch::deferred, add, 10, 20);
    cout << "------2------" << endl;
    this_thread::sleep_for(chrono::seconds(1));
    cout << "------3------" << endl;
    int res = fut.get();
    cout << "------4------" << endl;
    cout << "运行结果" << res << endl;
}

void async_solve()
{
    cout << "async" << endl;
    cout << "------1------" << endl;
    future<int> fut = async(launch::async, add, 10, 20);
    cout << "------2------" << endl;
    this_thread::sleep_for(chrono::seconds(1));
    cout << "------3------" << endl;
    int res = fut.get();
    cout << "------4------" << endl;
    cout << "运行结果" << res << endl;
}

int main()
{
    cout << "deferred: " << endl;
    deferred_solve();
    cout << endl;
    cout << "async: " << endl;
    async_solve();
    cout << endl;
    return 0;
}

运行结果如下所示:

deferred: 
deferred
------1------
------2------
------3------
加法
------4------
运行结果30

async: 
async
------1------
------2------
加法
------3------
------4------
运行结果30

从上可以看出一些端倪,对于deferred这种策略来说,它的策略是在进行get方法的时候再进行资源的计算,而对于async这样的策略来说,更多的是在进行调用之后就会进行计算,在这种调用之后,会立刻再开一个工作线程把内容计算完毕后传递回主函数,这是两个基本的调用逻辑~

下面演示第二种获取异步结果的方式:promise

promise

#include <iostream>
#include <thread>
#include <future>

using namespace std;

void add(int num1, int num2, promise<int>& pro)
{
    pro.set_value(num1 + num2);
    return;
}

int main()
{
    promise<int> pro;
    future<int> fut = pro.get_future();
    thread th(add, 10, 20, ref(pro));
    int res = fut.get();
    cout << "执行结果: " << res << endl;
    th.join();
    return 0;
}

这个场景本质上就是利用了一个promise对象来和future对象建立了关系,如果在获取future对象的时候并没有发生值改变,就会阻塞等待,保证了异步的基本进行

package task

下面说的是最后一种获取异步结果的方式:package task

对于这种调用的方式,可以把它生成的对象当成是一个可调用对象,下面演示其基本用法

#include <iostream>
#include <thread>
#include <future>
#include <memory>
using namespace std;

int add(int num1, int num2)
{
    return num1 + num2;
}

int main()
{
    auto ptask = make_shared<packaged_task<int(int, int)>>(add);
    future<int> fut = ptask->get_future();
    thread th([ptask](){
        (*ptask)(10, 20);
    });
    int sum = fut.get();
    cout << sum << endl;
    th.join();
    return 0;
}

C++11线程池实现

下面就基于上述的这三种当中的package来实现一个C++11线程池,其中会把相关的讲解和注释的信息放到代码片段中,这里直接展示对应的代码段

#include <features.h>
#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>
using namespace std;

class threadpool
{
    using Functor = function<void(void)>;
public:
    threadpool(int count = 1) : _stop(false)
    {
        for(int i = 0; i < count; i++)
            _threads.emplace_back(&threadpool::entry, this);
    }
    ~threadpool()
    { 
        stop();
    }
    void stop()
    {
        if(_stop == true)
            return;
        _stop = true;
        // 唤醒线程
        _cv.notify_all();
        // 回收线程
        for(auto& thread: _threads)
            thread.join();
    }
    // 对于push函数,传入的是一个用户要执行的函数,还有函数的参数
    // push函数的内部,会把这些传入的函数和参数封装为一个packaged_task
    // 然后使用lambda表达式生成一个可调用对象,放到任务池中,让工作线程取出执行
    template<typename F, typename ...Args>
    auto push(const F&& func, Args&& ...args) -> future<decltype(func(args...))>
    {
        // 1. 把传入的函数封装为一个packaged任务
        // 把返回类型获取出来
        using return_type = decltype(func(args...));
        // 把函数对象和函数参数绑定到一起
        auto tmp_func = bind(forward<F>(func), forward<Args>(args)...);
        // 把整体的tmp_func绑定成一个任务
        auto task = make_shared<packaged_task<return_type()>>(tmp_func);
        future<return_type> fut = task->get_future();
        // 2. 构造一个lambda匿名函数,函数内执行任务对象
        {
            unique_lock<mutex> lock(_mutex);
            // 3. 把匿名函数对象放到任务池中
            _taskpool.push_back([task](){ (*task)(); });
            _cv.notify_one();
        }
        return fut;
    }
private:
    void entry()
    {
        while(!_stop)
        {
            vector<Functor> tmp_taskpool;
            {
                unique_lock<mutex> lock(_mutex);
                _cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });
                tmp_taskpool.swap(_taskpool);
            }
            for(auto& task : tmp_taskpool)
                task();
        }
    }
private:
    atomic<bool> _stop;
    vector<Functor> _taskpool;
    mutex _mutex;
    condition_variable _cv;
    vector<thread> _threads;
};

int add(int num1, int num2)
{
    return num1 + num2;
}

int main()
{
    threadpool pool;
    for(int i = 0; i < 10; i++)
    {
        future<int> fut = pool.push(add, 10, i);
        cout << fut.get() << endl;
    }
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/675652.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

28 _ WebComponent:像搭积木一样构建Web应用

在上一篇文章中我们从技术演变的角度介绍了PWA&#xff0c;这是一套集合了多种技术的理念&#xff0c;让浏览器渐进式适应设备端。今天我们要站在开发者和项目角度来聊聊WebComponent&#xff0c;同样它也是一套技术的组合&#xff0c;能提供给开发者组件化开发的能力。 那什么…

微信支付(可复用)

3.1微信支付 本项目选择小程序支付 参考&#xff1a;产品中心 - 微信支付商户平台微信支付商户平台提供各类支付产品满足商家通过微信支付收款的需求&#xff1b;平台提供智慧经营&#xff0c;现金红包&#xff0c;代金券等运营工具&#xff0c;助力商家更好的玩转营销&#x…

重生奇迹mu格斗家介绍

出生地&#xff1a;勇者大陆 性 别&#xff1a;男 擅 长&#xff1a;近距离攻击、技能以PVP为主战斗风格 转 职&#xff1a;格斗大师&#xff08;3转&#xff09; 介 绍&#xff1a;以PVP战斗模式为主的格斗家&#xff0c;依角色养成配点不同&#xff0c;可发展成以力量体力…

恒创科技:无法与服务器建立安全连接怎么解决?

在使用互联网服务时&#xff0c;有时会出现无法与服务器建立安全连接的问题&#xff0c;此错误消息通常出现在尝试访问需要安全连接的网站(例如使用 HTTPS 的网站)时&#xff0c;这可能是由于多种原因造成的&#xff0c;以下是一些常见的解决方法&#xff0c;帮助你解决问题。 …

AI来了,产品经理该怎样面对它?

AI终于来了&#xff0c;我们一方面期待着它可能给我们生活带来的变化&#xff0c;另一方面又担忧它可能带给我们巨大的风险和挑战。 AI带来的影响 AI不确定性的风险有很多&#xff0c;例如有人关注它是否成为“奥创”&#xff0c;但对我们大多数人来说这样的风险还很遥远&#…

Java1.8+ idea hbuilder+ uniapp、vue上门家政小程序APP源码开发

Java1.8 idea hbuilder uniapp、vue上门家政小程序APP源码开发 家政服务系统是一种专为家庭提供全方位服务的综合性系统。该系统通过整合多种服务功能和智能化管理&#xff0c;旨在提高家庭生活的质量和效率。 家政服务系统技术开发环境&#xff1a; 技术架构&#xff1a;spri…

怎么制作在线研学活动报名系统?教你快速搞定

易查分小程序&#xff1a;提升研学活动体验&#xff0c;智慧管理新选择 在教育多元化的今天&#xff0c;学校组织的研学活动可以为学生提供更多实践学习、探索世界的机会。不过&#xff0c;对于老师来说&#xff0c;活动的报名和管理常常比较复杂&#xff0c;导致工作量增加。…

工业相机识别电路板元器件:彩色与黑白的区别

工业相机用于识别电路板上的元器件时&#xff0c;选择彩色相机或黑白相机取决于具体应用需求和条件。彩色相机能提供更丰富的信息&#xff0c;但处理复杂度较高&#xff1b;黑白相机则在处理速度和精度上具有优势。理解它们的区别和各自的优缺点&#xff0c;有助于在具体项目中…

软件功能测试内容简析,第三方软件测试机构进行功能测试的好处

软件功能测试是指对软件产品的各项功能进行验证和确认的过程。它是软件开发过程中非常重要的一环&#xff0c;通过对软件的功能进行全面测试&#xff0c;可以确保软件在交付给用户之前达到预期的质量要求。 在进行功能测试时&#xff0c;需要包括以下几个方面的测试内容&#…

docker运行centos提示Operation not permitted

1、在docker中运行了centos7镜像 2、进入到centos容器中使用systemctl命令时提示 systemctl Failed to get D-Bus connection: Operation not permitted 3、解决办法 在运行centos镜像的时候加上--privileged参数 4、附上docker官网命令说明截图

驱动芯片退饱和保护(DESAT)

驱动芯片退饱和保护&#xff08;DESAT&#xff09; 1.概述2.短路能力评估3.驱动芯片的退饱和保护功能介绍3.1 退饱和工作原理3.2 退饱和电路的关键组成和影响因素 4.驱动芯片的退饱和保护功能的调试4.1 如何增加 DESAT 充电电流4.2 如何调整 DESAT 阈值电压4.3 如何使用 OC 功能…

Chrome 调试技巧

1. alert 在最早的时候&#xff0c;javascript 程序员调试代码都是通过 alert 进行&#xff0c;但 alert 会让整个程序被打断&#xff0c;并且还有一个很大的缺点&#xff0c;调试完成之后&#xff0c;如果忘记将 alert 删除 or 注释掉&#xff0c;导致别人访问该页面时会莫名…

基于System-Verilog实现DE2-115开发板驱动HC_SR04超声波测距

目录 前言 一、SystemVerilog——下一代硬件设计语言 与Verilog关系 与SystemC关系 二、实验原理 2.1 传感器概述&#xff1a; 2.2 传感器引脚 2.3 传感器工作原理 2.4 整体测距原理及编写思路 三、System-Verilog文件 3.1 时钟分频 3.2 超声波测距 3.3 数码管驱动…

【EI会议/稳定检索】2024年机械、传感与自动控制国际会议(MSAC 2024)

2024 International Conference on Machinery, Sensing, and Automatic Control 2024年机械、传感与自动控制国际会议 【会议信息】 会议简称&#xff1a;MSAC 2024 大会地点&#xff1a;中国贵阳 会议官网&#xff1a;www.msaciac.com 会议邮箱&#xff1a;msacsub-paper.com…

TMS320F280049 ECAP模块--总览(0)

ECAP 特性&#xff1a; 4个32bit的事件时间戳寄存器&#xff1b; 4个连续时间戳捕获事件的边沿极性可选上升沿、下降沿 4个事件中每个都能触发中断 4个事件都能做单词触发 可以连续捕获4个事件 绝对的捕获时间戳 差异模式捕获 不使用捕获模式时&#xff0c;可以配置输出…

【设计模式】JAVA Design Patterns——Factory Method(虚拟构造器模式)

&#x1f50d;目的 为创建一个对象定义一个接口&#xff0c;但是让子类决定实例化哪个类。工厂方法允许类将实例化延迟到子类 &#x1f50d;解释 真实世界例子 铁匠生产武器。精灵需要精灵武器&#xff0c;而兽人需要兽人武器。根据客户来召唤正确类型的铁匠。 通俗描述 它为类…

esp32芯片选型网页链接

![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/bd340b1360dc45ec9a4286f64c95b39d.png esp32芯片选型网页链接

采用C#、Python和Qt开发上位机好看的界面也不是多的事儿

采用C#、Python和Qt开发上位机好看的界面也不是多的事儿

iOS18新功能大爆料,打破常规,全面升级,这些变化不容错过!

众所周知&#xff0c;苹果 iOS 操作系统近年来都没有发生重大变化&#xff0c;主要是添加小部件、锁屏编辑和手机屏幕编辑等功能&#xff0c;再加上bug偏多&#xff0c;以至于越来越多iPhone用户不愿意再升级系统了。这一点&#xff0c;从 iOS 17 明显降低的安装率中就能看出一…

他人项目二次开发——慎接

接了一个朋友的项目——开发及运营迭代差不多2年多了&#xff0c;整体样子移动端和PC都能正常使用&#xff0c;但后期的扩展性及新功能添加出现瓶颈。 因此给了一部分钱&#xff0c;让我接手来开发——重构架构。 背景说明 朋友公司的技术人员是我帮忙招聘的&#xff0c;相关技…