C++使用线程池模拟异步事件处理机制

  在C++很多框架中都有异步事件处理机制,这导致我们在看源码时经常很疑惑,难以理解,而其中包含的编程套路可能是一些成熟的技术,只是我们不熟悉,比如WebRTC中类似于Qt的信号槽机制,线程事件处理, 或者使用系统异步IO等等,如果看不懂这些套路,理解代码会很难,本篇博客来尝使用用C++线程池实现一种异步事件处理机制。

异步事件处理机制的基本实现

  C++可以使用std::future和std::promise来实现异步操作。然而,为了实现一个异步事件绑定的框架,我们需要更复杂的设计。下面是一个简单的例子,说明如何实现一个异步事件处理器。

  首先,定义一个事件处理器类,该类将接收并处理事件:

class EventHandler {
public:
    virtual ~EventHandler() = default;
    virtual void handleEvent(int eventID) = 0;
};

  然后,我们需要创建一个事件分发器,它将异步地调用事件处理器:

/*

事件注册,分发

*/

#pragma once

#include "EventHandler.hpp"
#include <map>
#include <thread>
#include <future>
#include <functional>
#include <memory>

class EventDispatcher {
public:
    // 注册事件处理器
    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        handlers[eventID] = handler;
    }

    // 异步事件分发函数
    void postEvent(int eventID) {
        auto it = handlers.find(eventID);
        if (it != handlers.end()) {
            std::thread eventThread(&EventDispatcher::dispatchEvent, this, it->second, eventID);
            eventThread.detach();
        }
    }

private:
    // 事件分发函数
    void dispatchEvent(std::shared_ptr<EventHandler> handler, int eventID) {
        handler->handleEvent(eventID);
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> handlers;  // 存储事件,int 事件id, std::shared_ptr<EventHandler> 事件处理器
};

  在这个例子中,EventDispatcher类的postEvent方法接收一个事件ID,并在新线程中调用相应的事件处理器。这样做可以实现事件的异步处理。

  然后,你可以创建一个或多个处理器类,比如下面的打印事件处理器PrintEventHandler ,它实现EventHandler接口,

/*

具体的事件处理器

*/

#include "EventHandler.hpp"
#include <iostream>

using namespace std;

class PrintEventHandler : public EventHandler {
public:
    void handleEvent(int eventID) override {
        std::cout << "Handling event " << eventID << std::endl;
    }
};

  然后再main函数中进行注册:

/*

C++异步事件框架demo01

*/


#include <iostream>
#include <memory>
#include <thread>
#include <chrono>
#include "EventDispatcher.hpp"
#include "PrintEventHandler.hpp"

int main() {
    EventDispatcher dispatcher;
    
    std::shared_ptr<EventHandler> printHandler = std::make_shared<PrintEventHandler>();
    dispatcher.registerHandler(1, printHandler);

    dispatcher.postEvent(1);

    // Sleep main thread to let the event thread finish.
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

运行结果:

Handling event 1

代码组织如下,有兴趣的可以自行编写实现:
在这里插入图片描述

cmake脚本

#[[

    编译方法
    cmake -S . -B build
    cd build
    make
    ./demo01

]]

cmake_minimum_required(VERSION 3.20)

project(demo01)

set(INCLUDE_PATH1  "./")

# 添加头文件目录
include_directories(
    ${INCLUDE_PATH1}
)

# 添加子目录src
aux_source_directory("./" SRC)

add_executable(demo01 ${SRC})

  这个实现是非常基础的,并没有考虑到线程安全问题和异常处理等等。在实际的项目中,你需要更复杂的设计,并使用更高级的并发编程技术,如线程池、任务队列、互斥锁等等。

添加线程池、任务队列

  如果想要更复杂的设计,包括线程池、任务队列、互斥锁等,你可以考虑使用以下的设计。下面的例子使用了C++17的std::async和std::future来实现线程池和任务队列。

  首先,我们需要一个线程安全的任务队列:

#pragma once

#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue {
public:
    ThreadSafeQueue() = default;
    ThreadSafeQueue(const ThreadSafeQueue<T> &) = delete;
    ThreadSafeQueue& operator=(const ThreadSafeQueue<T> &) = delete;

    void push(T value) {
        std::lock_guard<std::mutex> lock(mMutex);
        mQueue.push(std::move(value));
        mCondition.notify_one();
    }

    bool try_pop(T& value) {
        std::lock_guard<std::mutex> lock(mMutex);
        if (mQueue.empty()) {
            return false;
        }
        
        value = std::move(mQueue.front());
        mQueue.pop();
        return true;
    }

    void wait_and_pop(T& value) {
        std::unique_lock<std::mutex> lock(mMutex);
        mCondition.wait(lock, [this](){ return !mQueue.empty(); });
        value = std::move(mQueue.front());
        mQueue.pop();
    }

private:
    std::queue<T> mQueue;
    std::mutex mMutex;
    std::condition_variable mCondition;
};

  然后,我们需要一个线程池来处理这些任务:

#pragma once

#include "ThreadSafeQueue.hpp"
#include <vector>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads) {
        start(numThreads);
    }

    ~ThreadPool() {
        stop();
    }

    template<typename T>
    void enqueue(T task) {
        mTasks.push(std::make_shared<std::packaged_task<void()>>(std::move(task)));
    }

private:
    std::vector<std::thread> mThreads;
    ThreadSafeQueue<std::shared_ptr<std::packaged_task<void()>>> mTasks;
    std::atomic<bool> mContinue { true };

    void start(size_t numThreads) {
        for (auto i = 0u; i < numThreads; ++i) {
            mThreads.emplace_back([=] {
                while (mContinue) {
                    std::shared_ptr<std::packaged_task<void()>> task;
                    if (mTasks.try_pop(task)) {
                        (*task)();
                    } else {
                        std::this_thread::yield();
                    }
                }
            });
        }
    }

    void stop() noexcept {
        mContinue = false;
        for (auto &thread : mThreads) {
            thread.join();
        }
    }
};

  然后,我们可以在EventDispatcher中使用线程池来异步地处理事件:

#pragma once
#include "ThreadPool.hpp"
#include <mutex>
#include <map>
#include <memory>
#include "EventHandler.hpp"

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            mThreadPool.enqueue([handler = it->second, eventID]() {
                handler->handleEvent(eventID);
            });
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

  这个实现现在是线程安全的,并且使用了线程池来处理事件。对于异常处理,你可以在EventHandler的handleEvent方法内部进行。

添加异常处理

  如何在EventDispatcher中处理异常?
EventDispatcher中处理异常,我们可以在任务的执行过程中增加一个try/catch块,捕获并处理可能出现的异常。下面是一个示例:

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            mThreadPool.enqueue([handler = it->second, eventID]() {
                try {
                    handler->handleEvent(eventID);
                } catch (const std::exception &ex) {
                    std::cerr << "Exception occurred while handling event " << eventID 
                              << ": " << ex.what() << '\n';
                } catch (...) {
                    std::cerr << "Unknown exception occurred while handling event " << eventID << '\n';
                }
            });
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

  在这个示例中,如果handleEvent方法抛出了一个异常,那么我们将在控制台上打印出一个错误消息。具体如何处理这些异常,取决于你的应用程序的特定需求。

  请注意,这种方法仅处理了在handleEvent方法中抛出的异常。如果在其他地方(如ThreadPool的构造函数或enqueue方法)抛出异常,你可能需要额外的代码来处理这些情况。

注册多个事件

  在我们的事件分发器设计中,注册多个事件是非常简单的。你只需要创建不同的事件处理器,然后使用不同的事件ID将它们注册到事件分发器中。以下是一个示例:

class PrintEventHandler : public EventHandler {
public:
    PrintEventHandler(int id) : id(id) {}
    void handleEvent(int eventID) override {
        std::cout << "Handler " << id << " handling event " << eventID << std::endl;
    }
private:
    int id;
};

int main() {
    EventDispatcher dispatcher(4);  // 创建一个有4个线程的事件分发器

    std::shared_ptr<EventHandler> handler1 = std::make_shared<PrintEventHandler>(1);
    dispatcher.registerHandler(1, handler1);

    std::shared_ptr<EventHandler> handler2 = std::make_shared<PrintEventHandler>(2);
    dispatcher.registerHandler(2, handler2);

    dispatcher.postEvent(1);
    dispatcher.postEvent(2);

    // Sleep main thread to let the event threads finish
    std::this_thread::sleep_for(std::chrono::seconds(1));

    return 0;
}

  在这个示例中,我们创建了两个PrintEventHandler对象,每个对象都有一个唯一的ID。然后,我们使用不同的事件ID将这两个处理器注册到EventDispatcher中。当我们发布事件时,相应的处理器将会被调用。

  这就是如何在我们的异步事件绑定框架中注册多个事件的方法。

如何取消注册

  要从事件分发器中取消注册一个事件处理器,你可以在EventDispatcher类中添加一个新的方法,例如unregisterHandler,用于从处理器映射中删除指定的事件ID。下面是如何实现这个函数的例子:

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void unregisterHandler(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers.erase(eventID);
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            mThreadPool.enqueue([handler = it->second, eventID]() {
                try {
                    handler->handleEvent(eventID);
                } catch (const std::exception &ex) {
                    std::cerr << "Exception occurred while handling event " << eventID 
                              << ": " << ex.what() << '\n';
                } catch (...) {
                    std::cerr << "Unknown exception occurred while handling event " << eventID << '\n';
                }
            });
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

  这个unregisterHandler函数将删除与给定事件ID关联的事件处理器。请注意,这个函数并不会停止已经在处理该事件的任何线程。如果你想要取消正在进行的事件处理,你可能需要一个更复杂的设计,例如使用std::futurestd::promise来控制线程的执行。

如何停止正在进行的事件

  要停止正在进行的事件处理,我们需要更复杂的设计,它可能包括使用std::futurestd::promise来控制线程的执行。在这种设计中,每当一个事件被发布时,我们将创建一个std::promise,并将相应的std::future存储在某个地方,以便我们可以稍后在需要时停止事件处理。

  但是,要注意的是,根据C++的设计,没有一个简单且安全的方法可以强制停止正在运行的线程,因为这可能会导致资源泄漏或其他未定义的行为。因此,更常见的做法是让事件处理器定期检查一个“停止标记”,然后在检查到该标记时优雅地停止执行。以下是一个简单的示例,演示了如何实现这种设计:

class StoppableEvent {
public:
    StoppableEvent(std::future<void> future, std::function<void()> func)
        : mFuture(std::move(future)), mFunc(std::move(func)) {}

    void operator()() {
        while(mFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::timeout) {
            mFunc();
        }
    }

private:
    std::future<void> mFuture;
    std::function<void()> mFunc;
};

class EventDispatcher {
public:
    EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}

    void registerHandler(int eventID, std::shared_ptr<EventHandler> handler) {
        std::lock_guard<std::mutex> lock(mMutex);
        mHandlers[eventID] = handler;
    }

    void postEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mHandlers.find(eventID);
        if (it != mHandlers.end()) {
            std::promise<void> stopSignal;
            auto stopFuture = stopSignal.get_future();
            mStopSignals[eventID] = std::move(stopSignal);
            mThreadPool.enqueue(StoppableEvent(std::move(stopFuture), [handler = it->second, eventID]() {
                handler->handleEvent(eventID);
            }));
        }
    }

    void stopEvent(int eventID) {
        std::lock_guard<std::mutex> lock(mMutex);
        auto it = mStopSignals.find(eventID);
        if (it != mStopSignals.end()) {
            it->second.set_value();
            mStopSignals.erase(it);
        }
    }

private:
    std::map<int, std::shared_ptr<EventHandler>> mHandlers;
    std::map<int, std::promise<void>> mStopSignals;
    ThreadPool mThreadPool;
    std::mutex mMutex;
};

  在这个例子中,我们定义了一个StoppableEvent类,它将一个std::future和一个函数组合在一起。当operator()被调用时,它将定期检查future,如果future的状态不是timeout,则停止执行函数。

  然后,当我们在EventDispatcher中发布一个事件时,我们将创建一个新的std::promise和相应的std::future,并将这个future和事件处理器的handleEvent方法一起传递给StoppableEvent。我们还将promise存储在一个映射中,以便我们可以稍后通过调用set_value来发出停止信号。

  最后,我们添加了一个stopEvent方法,它将查找与给定事件ID关联的promise,并通过调用set_value来发出停止信号。然后,它将从映射中删除这个promise,因为我们不再需要它。

  这是一个基本的示例,你可能需要根据你的具体需求来修改和扩展它。请注意,这个设计假设事件处理器的handleEvent方法将被调用多次,每次调用都可能被中断。如果你的事件处理器只执行一次长时间运行的任务,那么这个设计可能并不适合。

  以上是一个简易的异步事件处理demo, 在项目开发中,需要根据具体的业务需求进行调整完善。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/132099.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CS224W5.2——Relational and Iterative Classification

本节中&#xff0c;我们介绍用于节点分类的关系分类器和迭代分类。 从关系分类器开始&#xff0c;我们展示了如何基于邻居的标签迭代更新节点标签的概率。然后讨论迭代分类&#xff0c;通过根据邻居的标签及其特征预测节点标签来改进集体分类。 文章目录 1. 框架2. 关系分类3.…

软件测试之Web自动化测试,Web自动化测试的详细流程和步骤

一、什么是web自动化测试 自动化&#xff08;Automation&#xff09;是指机器设备、系统或过程&#xff08;生产、管理过程&#xff09;在没有人或较少人的直接参与下&#xff0c;按照人的要求&#xff0c;经过自动检测、信息处理、分析判断、操纵控制&#xff0c;实现预期的目…

JUC下常见类

JUC(java.util.concurrent) 的常见类ReentrantLock原子类线程池信号量SemaphoreCountDownLatch JUC(java.util.concurrent) 的常见类 ReentrantLock ReentrantLock可重入互斥锁. 和 synchronized 定位类似, 都是用来实现互斥效果, 保证线程安全。 用法: lock(): 加锁, 如果获…

【开源】基于Vue.js的大学兼职教师管理系统的设计和实现

目录 一、摘要1.1 项目介绍1.2 项目详细录屏 二、研究内容三、界面展示3.1 登录注册3.2 学生教师管理3.3 课程管理模块3.4 授课管理模块3.5 课程考勤模块3.6 课程评价模块3.7 课程成绩模块3.8 可视化图表 四、免责说明 一、摘要 1.1 项目介绍 大学兼职教师管理系统&#xff0…

Arduino、arm、树莓派、单片机四者有什么不同

文章目录 ArduinoARM树莓派单片机 初学单片机的同学&#xff0c;可能会对Arduino、ARM、树莓派以及单片机这些概念比较模糊&#xff0c;实际上&#xff0c;这四个是不同的概念和技术。 Arduino Arduino&#xff08;阿尔杜伊诺&#xff09;是一种开源电子原型平台&#xff0c;它…

基于SSM的房屋租售信息管理系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

人工智能基础_机器学习022_使用正则化_曼哈顿距离_欧氏距离_提高模型鲁棒性_过拟合_欠拟合_正则化提高模型泛化能力---人工智能工作笔记0062

然后我们再来看一下,过拟合和欠拟合,现在,实际上欠拟合,出现的情况已经不多了,欠拟合是 在训练集和测试集的准确率不高,学习不到位的情况. 然后现在一般碰到的是过拟合,可以看到第二个就是,完全就把红点蓝点分开了,这种情况是不好的, 因为分开是对训练数据进行分开的,如果来…

Vatee万腾科技决策力的未来展望:开创数字化创新的新高度

随着科技不断演进&#xff0c;Vatee万腾的科技决策力在数字化创新领域展现出了强大的潜力和前瞻性。 Vatee万腾的科技决策力被视为数字化创新的引擎&#xff0c;为未来创新注入了新的动力。通过深刻的市场洞察和科学决策&#xff0c;Vatee万腾致力于推动数字化创新走向新的高度…

mycat2 读写分离

mycat2 读写分离 mycat2 读写分离1.创建两个主从复制的数据库2.mycat2 读写分离3.mycat2读写分离测试 mycat2 读写分离 1.创建两个主从复制的数据库 参考&#xff1a;mysql主从复制 2.mycat2 读写分离 连接到mycat数据库 1.在mycat中创建数据库mydb1 CREATE DATABASE mydb…

linux发展史(必看系列)

Linux介绍&#xff1a; Linux&#xff0c;Linux Is Not UniX 的首字母缩写。是一款开源的&#xff0c;能自由传播的类Unix的操作系统&#xff0c;其内核由林纳斯本纳第克特托瓦兹&#xff08;Linus Benedict Torvalds&#xff09;于1991年10月5日首次发布&#xff0c;它主要受到…

iOS 17.2更新:15Pro支持拍摄空间视频!

苹果又为开发者预览版用户推送了iOS 17.2 Beta2测试版的更新&#xff0c;已经注册Apple Beta版软件计划的用户只需打开设置--通用--软件更新即可在线OTA升级至最新的iOS 17.2测试版。 本次更新包大小为750M左右&#xff0c;内部版本号为&#xff08;21C5040g&#xff09;&#…

山西电力市场日前价格预测【2023-11-12】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-11-12&#xff09;山西电力市场全天平均日前电价为224.59元/MWh。其中&#xff0c;最高日前电价为434.30元/MWh&#xff0c;预计出现在18:00。最低日前电价为0.00元/MWh&#xff0c;预计出…

前端开发入门笔记(八)CSS3属性详解:动画详解+Flex布局图文详解+Web字体

参考链接&#xff1a;https://web.qianguyihao.com/02-CSS%E5%9F%BA%E7%A1%80/12-CSS3%E5%B1%9E%E6%80%A7%E8%AF%A6%E8%A7%A3%EF%BC%9A%E5%8A%A8%E7%94%BB%E8%AF%A6%E8%A7%A3.html#_3%E3%80%81%E6%97%8B%E8%BD%AC%EF%BC%9Arotate 过渡 transition的中文含义是过渡。过渡是CSS…

【Linux网络】系统调优之聚合链路bonding,可以实现高可用和负载均衡

一、什么是多网卡绑定 二、聚合链路的工作模式 三、实操创建bonding设备&#xff08;mode1&#xff09; 1、实验 2、配置文件解读 3、查看bonding状态,验证bonding的高可用效果 三、nmcli实现bonding 一、什么是多网卡绑定 将多块网卡绑定同一IP地址对外提供服务&#xf…

《红蓝攻防对抗实战》九.内网穿透之利用GRE协议进行隧道穿透

​ 前文推荐&#xff1a; 《红蓝攻防对抗实战》一. 隧道穿透技术详解 《红蓝攻防对抗实战》二.内网探测协议出网之TCP/UDP协议探测出网 《红蓝攻防对抗实战》三.内网探测协议出网之HTTP/HTTPS协议探测出网 《红蓝攻防对抗实战》四.内网探测协议出网之ICMP协议探测出网 《红蓝…

深度学习 python opencv 火焰检测识别 计算机竞赛

文章目录 0 前言1 基于YOLO的火焰检测与识别2 课题背景3 卷积神经网络3.1 卷积层3.2 池化层3.3 激活函数&#xff1a;3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 YOLOV54.1 网络架构图4.2 输入端4.3 基准网络4.4 Neck网络4.5 Head输出层 5 数据集准备5.1 数…

Sensor 点亮出图后,颜色偏红或者偏绿是为什么?

这是因为 sensor balck level 的值配置的不正确导致&#xff0c;black level 的值一般在效果参数的 calibration 参数里面。 在驱动调试阶段&#xff0c;我们一般都是复用其他已调试好的&#xff0c;sensor 的驱动文件及效果文件&#xff0c; 而不同 sensor 的 balck level 的…

Linux是什么,Linux系统介绍

很多小伙伴都不是那么了解和知道Linux&#xff0c;到底Linux是什么&#xff1f; 像大家用到的安卓手机&#xff0c;生活中用到的各种智能设备&#xff0c;比如路由器&#xff0c;光猫&#xff0c;智能家具等&#xff0c;很多都是在Linux操作系统上。 Linux是什么&#xff1f;Li…

易思智能物流无人值守系统文件上传漏洞复现

简介 智能物流无人值守是针对流程生产企业原料采购、产成品销售及厂内物流的统一管控智能信息化平台。 目的:全企业产供销业务的集成管理,无人值守计量、降本增效、机器替代人工&#xff0c;优化物流资源管控体系。 该系统5.0版本/Sys_ReportFile/ImportReport接口处存在任意…

什么是Ribbon的饥饿加载?有什么优势?

目录 一、什么是Ribbon 二、什么是饥饿加载 三、Ribbon饥饿加载的优势 四、Ribbon饥饿加载的劣势 一、什么是Ribbon Ribbon是一个开源的、基于HTTP和TCP的客户端负载均衡工具&#xff0c;它提供了一个简单的、基于配置的负载均衡策略&#xff0c;可以帮助开发人员更轻松地…