0.线程池的概念
1.线程池使用步骤
①初始化线程池:确定线程数量,并做好互斥访问;
②启动所有线程
③准备好任务处理基类;
④获取任务接口:通过条件变量阻塞等待任务
2.atomic原子操作
'std:atomic`是C++11标准库中的一个模板类,用于实现多线程环境下的原子操作。它提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中的数据竞争问题,'std:atomic’的使用方式类似于普通的C++变量,但是它的操作是原子性的。也就是说,在多线程环境下,多个线程同时对同一个’std:atomic变量进行操作时,不会出现数据竞争问题。
3.线程池案例
①threadpool.cpp
#include "threadpool.h"
void XThreadPool::Init(int num)
{
std::unique_lock<std::mutex> lock(__mux__);
thread_num = num;
std::cout << "Thread pool Init: " << num << std::endl;
}
void XThreadPool::Start()
{
std::unique_lock<std::mutex> lock(__mux__);
if (thread_num <= 0)
{
std::cerr << "Please Init XThreadPool !" << std::endl;
return;
}
if (!threads.empty())
{
std::cerr << "Thread Pool has start!" << std::endl;
return;
}
for (int i = 0; i < thread_num; i++)
{
auto th = std::make_shared<std::thread>(&XThreadPool::Run, this);
threads.push_back(th);
}
}
void XThreadPool::Run()
{
std::cout << "begin XThreadPool Run: " << std::this_thread::get_id() << std::endl;
while (true)
{
auto task = GetTask();
if (!task)
{
continue;
}
++__task_run_count__;
try
{
auto re = task->Run();
task->Setvalue(re);
}
catch (...)
{
}
--__task_run_count__;
}
std::cout << "end XThreadPool Run: " << std::this_thread::get_id() << std::endl;
}
void XThreadPool::AddTask(XTask *task)
{
std::unique_lock<std::mutex> lock(__mux__);
tasks.push_back(task);
task->is_exit = [this] {return is_exit(); }
}
XTask* XThreadPool::GetTask()
{
std::unique_lock<std::mutex> lock(__mux__);
if (tasks.empty())
{
__cv__.wait(lock);
}
auto task = tasks.front();
tasks.pop_front();
return task;
}
void XThreadPool::Stop()
{
exit = true;
__cv__.notify_all();
for (auto &th : threads)
{
th->join();
}
std::unique_lock<std::mutex> lock(__mux__);
threads.clear();
}
②threadpool.h
#pragma once
#include <thread>
#include <mutex>
#include <vector>
#include <list>
#include <iostream>
#include <string>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>
class XTask
{
public:
virtual int Run() = 0;
std::function<bool()> is_exit = nullptr;
void Setvalue(int v) { __p__.set_value(v); }
auto GetValue() { return __p__.get_future().get(); }
private:
std::promise<int> __p__;//用来接收返回值
};
class XThreadPool
{
public:
void Init(int num);
void Start();//所有线程启动函数
void Stop();//线程池退出
void AddTask(XTask *task);
XTask* GetTask();
bool is_exit() { return exit; }
int task_run_count() { return __task_run_count__; }
private:
int thread_num = 0;
std::mutex __mux__;
void Run();//线程池线程入口函数
std::vector<std::shared_ptr<std::thread>> threads;
std::list<XTask*> tasks;
std::condition_variable __cv__;
bool exit = false;
std::atomic<int> __task_run_count__ = {0};//正在运行的任务数量
};
③main.cpp
#include "threadpool.h"
class MyTask :public XTask
{
public:
int Run()
{
std::cout <<"==============================================" << std::endl;
std::cout << std::this_thread::get_id() << "-Mytask" << name << std::endl;
std::cout << "==============================================" << std::endl;
for (int i = 0; i < 10; i++)
{
if(is_exit())
{
break;
}
std::cout << "." << std::flush;
std::this_thread::sleep_for(std::chrono::microseconds(500));
}
return 0;
}
std::string name = "";
};
int main()
{
XThreadPool pool;
pool.Init(16);
pool.Start();
MyTask task1;
task1.name = "test name 001";
pool.AddTask(&task1);
MyTask task2;
task2.name = "test name 002";
pool.AddTask(&task2);
std::this_thread::sleep_for(std::chrono::seconds(100));
std::cout << "task run count =" << pool.task_run_count() << std::endl;
MyTask task3;
task3.name = "test name 003";
pool.AddTask(&task3);
MyTask task4;
task4.name = "test name 004";
pool.AddTask(&task4);
std::cout << "task run count = " << pool.task_run_count() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
pool.Stop();
std::cout << "task run count =" << pool.task_run_count() << std::endl;
getchar();
return 0;
}