今天是一个与互斥锁和条件变量有关的一个模型,生产者消费者模型,为什么要用这个模型呢?其实这个模型我个人感觉的有点就是提高了效率,在多线程的情况下,提高了非常明显。并且解耦了生产者和消费者的关系。下面是一个这个模型的基本思路:
这个模型充分说明了两者的关系,就是一个共用的资源,一个放,一个拿,且有三种关系,两种角色,一个交易场所。取数据的我们可以看成是消费者,放数据的是生产者,交易场所就是这个共用资源,而这个关系我们应该怎样理解呢?就是如果有多个生产者和消费者,那么我们就要出现一个串行关系取数据或是放数据,不然就会乱,所以会很明显的是消费者之间是互斥关系,生产者之间也是互斥,而生产者与消费者是互斥与同步的关系。,所以弄清楚了基本的模型,我们就来看看如何实现。.
注:这个交易场所也就是共用资源可以不是队列,这里只是打个比方,为了更好的理解。
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <string>
#include <cstring>
#include "mtx.hpp"
#include "model.hpp"
#include <time.h>
using namespace std;
//创建线程数量
#define NUMPTHREAD_T 6
//函数指针
typedef void (*func)(int);
//函数声明
void product(int number);
void custcom(int number);
proCust<int> mod;
//判断是生产者还是消费者
struct Pro_or_cust
{
int _number;
string _name;
func _product = product;
func _custcom = custcom;
Pro_or_cust(int number)
: _number(number)
{
}
};
void product(int number)
{
while (true)
{
//同步所有线程
sleep(3);
//随机生成数,项目中一般是做其他任务
int val = rand() % 100000;
cout << "我是生产者" << pthread_self() << " " << number << endl;
mod.push(val, number);
sleep(1);
}
}
void custcom(int number)
{
while (true)
{
cout << "我是消费者" << pthread_self() << " " << number << endl;
int ret = mod.pop(number);
//打印获得数据,一般也可以处理获取的数据
cout << "我消费的数据" << ret << endl;
sleep(1);
}
}
void *enter(void *args)
{
Pro_or_cust *p = (Pro_or_cust *)args;
//判断是生产者还是消费者
if (strcmp(p->_name.c_str(), "custcom") == 0)
p->_custcom(p->_number);
else
p->_product(p->_number);
delete p;
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t tid[NUMPTHREAD_T];
for (int i = 0; i < NUMPTHREAD_T; i++)
{
Pro_or_cust *str = new Pro_or_cust(i + 1);
if (i % 2 == 0)
str->_name = "custcom";
else
str->_name = "product";
pthread_create(tid + i, nullptr, enter, str);
// sleep(1);
}
for (int i = 0; i < NUMPTHREAD_T; i++)
{
pthread_join(tid[i], nullptr);
}
cout << "回收资源成功" << endl;
return 0;
}
#pragma once
#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>
#include <time.h>
#include "mtx.hpp"
#define NUMSIZE 10
using namespace std;
template <class T>
class proCust
{
public:
proCust(size_t size = NUMSIZE)
: _size(size)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
void push(const T &x, int number)
{
Rmtx s(_mtx);
cout<<"我抢到锁了"<<number<<" " <<&_mtx<<endl;
sleep(1);
while (_size == _q.size())
{
pthread_cond_signal(&_empty);
pthread_cond_wait(&_full, &_mtx);
}
_q.push(x);
cout << "我是生产者:" << number << "号"
<< " "
<< "我的tid: " << pthread_self() << endl;
}
T pop(int number)
{
Rmtx s(_mtx);
cout<<"我抢到锁了"<<number<<" " <<&_mtx<<endl;
sleep(1);
while (_q.size() == 0)
{
pthread_cond_signal(&_full);
pthread_cond_wait(&_empty, &_mtx);
}
T x = _q.front();
_q.pop();
cout << "我是消费者:" << number << "号"
<< " "
<< "我的tid: " << pthread_self() << endl;
return x;
}
~proCust()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
size_t _size;
queue<T> _q;
pthread_cond_t _full;
pthread_cond_t _empty;
pthread_mutex_t _mtx;
};
#pragma once
#include <iostream>
#include <pthread.h>
#include <utility>
//RAII风格的加锁方式
struct Rmtx
{
public:
Rmtx(pthread_mutex_t &mtx)
: _mtx(mtx)
{
pthread_mutex_lock(&_mtx);
std::cout << "已上锁" << std::endl;
}
~Rmtx()
{
pthread_mutex_unlock(&_mtx);
std::cout << "已解锁" << std::endl;
}
private:
pthread_mutex_t &_mtx;
};
首先是两个头文件,一个test.cc文件。
这个模型本身很简单,但是实现起来,个人认为还是要有不少注意的点,而且不好找出错误。
首先给大家说说实现这个模型我个人认为要注意的点。
1.如果有小伙伴想给新线程传编号,就是第几个创建的线程,这里一定要注意。那就是因为pthread_create这个函数本身不会有延迟,但是奈何这个函数中,是创建一个新线程,也就是说这个函数内部就会分开,一个是新线程,一个是主线程,而主线程的速度要比新线程快,所以有可能就是主线程运行到第二次循环了,新线程可能才创建好,所以这里有时候会达不到自己的预期,切代码正确,就是检查不出错误原因。
2.其次就是这个RAII的风格方式加锁,不知道大家写的时候有没有遇见过,就是把锁的初始化写到了构造函数(上面的代码是写到Rmtx中构造函数),其实这样写是错的,这样的话类似于加不上锁,我也测试了很长时间才发现,因为这样的话每个线程都会创建一个这个对象,所以会把这个锁初始化好多次,造成类似于没有加锁的那种情况,就是多线程共同访问临界资源,这个一定要注意。
我上网查了下类似于同步多次初始化的结果,但是没查到,有的也没看懂,我个人认为可能是破坏了锁,所以导致错误。
多线程部分出现错误还是比较难调试的,所以一定要小心。
希望大家支持!!!