进程
- 进程就是运行中的程序
- 线程=进程中的进程
1、C++11 Thread线程库基础
#include <iostream>
#include <thread>
#include<string>
void printthread(std::string msg){
std::cout<<msg<<std::endl;
for (int i = 0; i < 1000; i++)
{
std::cout<<"my "<<i<<std::endl;
}
return;
}
int main(){
//std::thread t(入口)
//1、创建线程
std::thread t(printthread,"hello thread");
//2、保证等待线程结束,主线程在结束
// t.join();
//3、分离线程
//t.detach();
//4、joinable 判断是否可以调用join
bool isjoin = t.joinable();
if(isjoin){
t.join();
}
std::cout<<"over"<<std::endl;
system( "pause");
return 0;
}
2、线程函数中的数据未定义错误
2.1 临时变量
错误例子
#include <iostream>
#include <thread>
void foo(int& x){
x+=1;
}
int main(){
//
std::thread t(foo,1);
t.join();
system( "pause");
return 0;
}
正确方案
#include <iostream>
#include <thread>
void foo(int& x){
x+=1;
}
int main(){
//
int i=1;
std::thread t(foo,std::ref(i));
t.join();
std::cout<<i<<std::endl;
system( "pause");
return 0;
}
2.2 传递指针/引用指向局部变量
2.1 的 i 在 main中,那要是不在呢?
#include <iostream>
#include <thread>
std::thread t;
void foo(int& x){
x+=1;
}
void externi(){
int i=1;
t=std::thread (foo,std::ref(i));
}
int main(){
//
externi();
t.join();
system( "pause");
return 0;
}
会报错
那怎么办呢?
#include <iostream>
#include <thread>
std::thread t;
int i=1;
void foo(int& x){
x+=1;
}
void externi(){
t=std::thread (foo,std::ref(i));
}
int main(){
//
externi();
t.join();
std::cout<<i<<std::endl;
system( "pause");
return 0;
}
2.3 参数被提前手动释放
智能指针
#include <iostream>
#include <thread>
#include <memory>
class myclass
{
private:
/* data */
public:
void foo(){
std::cout<<"mememem"<<std::endl;
}
};
int main(){
std::shared_ptr<myclass> a=std::make_shared<myclass> ();
std::thread t(&myclass::foo,a);
system( "pause");
return 0;
}
2.4 类的private函数
友元
#include <iostream>
#include <thread>
#include <memory>
class myclass
{
private:
friend void foo_thread();
void foo(){
std::cout<<"mememem"<<std::endl;
}
public:
};
void foo_thread(){
std::shared_ptr<myclass> a=std::make_shared<myclass> ();
std::thread t(&myclass::foo,a);
t.join();
}
int main(){
foo_thread();
system( "pause");
return 0;
}
3互斥量
3.1数据共享–数据竞争问题
#include <iostream>
#include <thread>
int a = 0;
void func(){
for (int i = 0; i < 1000000; i++)
{
a+=1;
}
}
int main(){
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout<<a<<std::endl;
system( "pause");
return 0;
}
3.2 互斥锁
#include <iostream>
#include <thread>
#include <mutex>
int a = 0;
std::mutex mt;
void func(){
for (int i = 0; i < 1000000; i++)
{
mt.lock();
a+=1;
mt.unlock();
}
}
int main(){
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout<<a<<std::endl;
system( "pause");
return 0;
}
3.3 理解线程安全
4互斥量死锁
4.1 死锁的概念
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex m1,m2;
void func1(){
for (int i = 0; i < 100000; i++)
{
m1.lock();
m2.lock();
m1.unlock();
m2.unlock();
}
}
void func2(){
for (int i = 0; i < 100000; i++)
{
m1.lock();
m2.lock();
m2.unlock();
m1.unlock();
}
}
int main(){
thread t1(func1);
thread t2(func2);
t1.join();
t2.join();
cout<<"over<<"<<endl;
system( "pause");
return 0;
}
4.2 解决方案
5 lock_guard 和 std::unique_lock
5.1 lock_guard
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int a=0;
mutex m1;
void func1(){
for(int i=0;i<10000;i++)
{lock_guard<mutex> gm(m1);
a++;
}
}
int main(){
thread t1(func1);
t1.join();
cout<<a<<endl;
system( "pause");
return 0;
}
5.2 std::unique_lock
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int a=0;
mutex m1;
void func1(){
for(int i=0;i<10000;i++)
{
unique_lock<mutex> gm(m1);
a++;
}
}
int main(){
thread t1(func1);
t1.join();
cout<<a<<endl;
system( "pause");
return 0;
}
6 call_once
6.1 单例模式
6.2 例子:日志类
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
class Log{
public:
Log(){};
Log(const Log& log)=delete;
Log &operator =(const Log& log)=delete;
static Log& GetInstance(){
static Log log;//懒汉模式
return log;
//饿汉模式
/**
static Log *log=nullptr;
if(!log) log = new Log;
return *log;
*/
}
void PrintLog(string msg){
cout << __TIME__ <<" " <<msg<<endl;
}
};
int main(){
Log::GetInstance().PrintLog("error");
system( "pause");
return 0;
}
6.3 call_once
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
using namespace std;
// 需要一次性初始化的共享资源
class DatabaseConfig {
private:
string serverAddress;
int port;
DatabaseConfig() : serverAddress("127.0.0.1"), port(3306) {
cout << "数据库配置初始化完成!" << endl;
}
public:
static DatabaseConfig& getInstance() {
static once_flag initFlag;
static DatabaseConfig* instance = nullptr;
call_once(initFlag, []() {
instance = new DatabaseConfig();
});
return *instance;
}
void showConfig() {
cout << "Server: " << serverAddress
<< ":" << port << endl;
}
};
// 多线程测试函数
void threadTask(int id) {
this_thread::sleep_for(chrono::milliseconds(100 * id));
auto& config = DatabaseConfig::getInstance();
cout << "线程" << id << "获取配置:";
config.showConfig();
}
int main() {
vector<thread> threads;
// 创建10个线程竞争访问
for(int i = 0; i < 10; ++i) {
threads.emplace_back(threadTask, i);
}
// 等待所有线程完成
for(auto& t : threads) {
t.join();
}
system("pause");
return 0;
}
- 合理使用 call_once 可以让多线程代码更简洁、更安全,尤其适合需要一次性初始化的场景
7 condition_variable
7.1 生产者-消费者模式概述
生产者-消费者模式是多线程编程中经典的同步问题,需要满足以下条件:
- 生产者线程生成数据并放入共享缓冲区。
- 消费者线程从缓冲区取出数据并处理。
- 同步要求:
- 缓冲区满时,生产者等待消费者消费数据。
- 缓冲区空时,消费者等待生产者生产数据。
7.2 核心组件
- 共享缓冲区:通常使用队列(
std::queue
)实现。 - 互斥锁(
std::mutex
):保护对缓冲区的并发访问。 - 条件变量(
std::condition_variable
):not_full
:生产者等待缓冲区非满。not_empty
:消费者等待缓冲区非空。
7.3实现代码
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;
const int BUFFER_SIZE = 5; // 缓冲区容量
queue<int> buffer; // 共享缓冲区
mutex mtx; // 互斥锁
condition_variable not_full; // 缓冲区非满条件
condition_variable not_empty; // 缓冲区非空条件
// 生产者函数
void producer(int id) {
for (int i = 0; i < 10; ++i) {
unique_lock<mutex> lock(mtx);
// 如果缓冲区满,等待消费者消费
not_full.wait(lock, [] {
return buffer.size() < BUFFER_SIZE;
});
// 生产数据
int data = id * 100 + i;
buffer.push(data);
cout << "生产者 " << id << " 生产数据: " << data << endl;
lock.unlock();
not_empty.notify_one(); // 通知消费者
this_thread::sleep_for(chrono::milliseconds(100));
}
}
// 消费者函数
void consumer(int id) {
for (int i = 0; i < 10; ++i) {
unique_lock<mutex> lock(mtx);
// 如果缓冲区空,等待生产者生产
not_empty.wait(lock, [] {
return !buffer.empty();
});
// 消费数据
int data = buffer.front();
buffer.pop();
cout << "消费者 " << id << " 消费数据: " << data << endl;
lock.unlock();
not_full.notify_one(); // 通知生产者
this_thread::sleep_for(chrono::milliseconds(200));
}
}
int main() {
thread producers[2];
thread consumers[3];
// 启动2个生产者线程
for (int i = 0; i < 2; ++i) {
producers[i] = thread(producer, i);
}
// 启动3个消费者线程
for (int i = 0; i < 3; ++i) {
consumers[i] = thread(consumer, i);
}
// 等待所有线程结束
for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();
return 0;
}
7.4 代码解析
-
共享资源保护:
- 所有对缓冲区的操作(
push
、pop
)均在互斥锁mtx
的保护下进行。 - 使用
unique_lock
自动管理锁的生命周期。
- 所有对缓冲区的操作(
-
条件变量的使用:
- 生产者等待条件:
not_full.wait(lock, predicate)
当缓冲区满时(buffer.size() >= BUFFER_SIZE
),生产者线程阻塞,直到消费者消费数据后通过not_full.notify_one()
唤醒。 - 消费者等待条件:
not_empty.wait(lock, predicate)
当缓冲区空时(buffer.empty()
),消费者线程阻塞,直到生产者生产数据后通过not_empty.notify_one()
唤醒。
- 生产者等待条件:
-
通知机制:
- 生产者生产数据后调用
not_empty.notify_one()
,唤醒一个等待的消费者。 - 消费者消费数据后调用
not_full.notify_one()
,唤醒一个等待的生产者。
- 生产者生产数据后调用
7.5 运行结果示例
生产者 0 生产数据: 0
消费者 0 消费数据: 0
生产者 1 生产数据: 100
消费者 1 消费数据: 100
生产者 0 生产数据: 1
消费者 2 消费数据: 1
...
(输出将展示生产与消费的交替过程)
7.6 关键点总结
-
防止虚假唤醒:
条件变量的wait
必须配合谓词(如buffer.size() < BUFFER_SIZE
)使用,确保即使被意外唤醒也能重新检查条件。 -
资源管理:
unique_lock
在wait
时自动释放锁,唤醒后重新获取锁。- 使用
notify_one
而非notify_all
,减少不必要的线程竞争。
-
死锁避免:
- 确保在调用
notify_one
前释放锁(通过lock.unlock()
)。 - 避免在持有锁时进行耗时操作(如示例中的
sleep_for
在锁外执行)。
- 确保在调用
7.7 扩展场景
-
多生产者和多消费者:
当前代码已支持多个生产者和消费者,通过调整线程数量即可验证。 -
动态缓冲区大小:
可将BUFFER_SIZE
设为动态值,根据需求调整。 -
复杂数据类型:
将queue<int>
替换为自定义数据类型队列,实现更复杂的生产-消费逻辑。
此实现完整展示了如何利用condition_variable
实现线程安全的生产者-消费者模式,可直接用于实际项目中的任务队列、线程池等场景。
8 跨平台线程池
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
// 构造函数,启动指定数量的工作线程
ThreadPool(size_t threads = std::thread::hardware_concurrency())
: stop(false) {
for(size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
// 将任务添加到任务队列,返回一个future以便获取结果
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// 不允许在停止线程池后添加新任务
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// 析构函数,等待所有任务完成并停止所有线程
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker : workers)
worker.join();
}
private:
std::vector<std::thread> workers; // 工作线程集合
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 任务队列互斥锁
std::condition_variable condition; // 条件变量
bool stop; // 停止标志
};
// 使用示例
int main() {
ThreadPool pool(4); // 创建4个工作线程
// 提交多个任务到线程池
std::vector<std::future<int>> results;
for(int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::this_thread::sleep_for(std::chrono::seconds(1));
return i*i;
})
);
}
// 获取任务结果
for(auto && result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
9 异步并发 async future packaged task promise
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <vector>
int compute(int x) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
return x * x;
}
int main() {
// 使用 std::async 启动多个异步任务
std::vector<std::future<int>> futures;
for (int i = 1; i <= 5; ++i) {
futures.push_back(std::async(std::launch::async, compute, i));
}
// 获取所有任务的结果
for (auto& future : futures) {
std::cout << "Result: " << future.get() << std::endl;
}
// 使用 std::packaged_task 手动控制任务执行
std::packaged_task<int(int)> task(compute);
std::future<int> future = task.get_future();
std::thread t(std::move(task), 10);
t.join(); // 等待线程完成
std::cout << "Packaged Task Result: " << future.get() << std::endl;
return 0;
}
10 原子操作atomic
#include <iostream>
#include <thread>
#include <atomic>
using namespace std;
atomic<int> a(0); // 使用 atomic<int> 替代普通 int
void func1() {
for (int i = 0; i < 10000; i++) {
a++; // 原子操作,无需额外的互斥锁
}
}
int main() {
thread t1(func1);
t1.join();
cout << a << endl; // 输出最终结果
system("pause");
return 0;
}