线程的同步
- 一.条件变量
- 二.生产者和消费者模型
- 1.概念和特点
- 2.实现基于阻塞队列的生产者消费者模型
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
一.条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
创建条件变量
cond:要初始化的条件变量。
attr:NULL。
等待条件满足
唤醒等待
一个示例
我想让每一个线程依次对一个全局变量cnt做++操作。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
#include <iostream>
using namespace std;
pthread_mutex_t mutex=PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;//初始化锁
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;//初始化条件变量
int cnt=0;
void *route(void *arg)
{
char *id =(char*)arg;
while(1)
{
pthread_mutex_lock(&mutex);//上锁
pthread_cond_wait(&cond,&mutex);//为什么在这等待?因为它在让线程等待时会自动释放锁
cout<<id<<",cnt:"<<cnt++<<endl;
pthread_mutex_unlock(&mutex);//解锁
}
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
sleep(1);
cout<<"main thread begin:"<<endl;
while(1)
{
sleep(1);
pthread_cond_signal(&cond);//唤醒正在等待的一个线程,默认是第一个
//pthread_cond_broadcast(&cond);//唤醒所有线程
cout<<"signal one pthread...."<<endl;
}
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex);//销毁
return 0;
}
可以看到到此,所创建的线程均以一定顺序对cnt进行了++操作。注意这里的顺序不一定是1234,也有可能是2341…但它一定呈现周期性。
二.生产者和消费者模型
1.概念和特点
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
模型特点:
3种关系:消费者和消费者,生产者和生产者,生产者和消费者。
2种角色:消费者和生产者。
1个交易场所:特定的内存空间。
2.实现基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
实现一个cp模型
设置水平线,当产品数量低于水平线时开始生产,当产品数量高于水平线时开始消费。
BlockQueue.hpp
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
template<class T>
class Blockqueue
{
public:
Blockqueue(int capacity=20)
:capacity_(capacity)//默认容量是20
{
pthread_mutex_init(&mutex_, nullptr);//初始化锁
pthread_cond_init(&c_cond_, nullptr);//初始化消费者条件变量
pthread_cond_init(&p_cond_, nullptr);//初始化生产者条件变量
water_=capacity_/3;//水平线
}
T pop()
{
pthread_mutex_lock(&mutex_);//上锁
if(q_.size()==0)
{
//如果没有产品,消费者开始等待
pthread_cond_wait(&c_cond_,&mutex_);
}
T out=q_.front();
q_.pop();
if(q_.size()<water_)
{
//如果数量小于水平线,唤醒生产者
pthread_cond_signal(&p_cond_);
}
pthread_mutex_unlock(&mutex_);//解锁
return out;
}
void push(const T&in)
{
pthread_mutex_lock(&mutex_);//上锁
if(q_.size()>capacity_)
{
//如果数量高于容量,生产者开始等待
pthread_cond_wait(&p_cond_,&mutex_);
}
q_.push(in);
if(q_.size()>water_)
{
//如果数量高于水平线,唤醒消费者
pthread_cond_signal(&c_cond_);
}
pthread_mutex_unlock(&mutex_);//解锁
}
~Blockqueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源
int capacity_; //容量
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
int water_;
};
main.cc
#include"Blockqueue.hpp"
void*Constumer(void*args)//消费者
{
Blockqueue<int> *bq=static_cast<Blockqueue<int>*>(args);//强转
while(1)
{
int out=bq->pop();
std::cout<<"消费了一个数据:"<<out<<std::endl;
}
}
void*Producer(void*args)//生产者
{
Blockqueue<int> *bq=static_cast<Blockqueue<int>*>(args);
int data=0;
while(1)
{
sleep(1);
bq->push(data);
std::cout<<"生产了一个数据:"<<data++<<std::endl;
}
}
int main()
{
Blockqueue<int> *dp=new Blockqueue<int>();
pthread_t t,c;
pthread_create(&t,nullptr,Constumer,(void*)dp);
pthread_create(&c,nullptr,Producer,(void*)dp);
pthread_join(c,nullptr);
pthread_join(t,nullptr);
delete dp;
return 0;
}