目录
线程互斥
进程线程间的互斥相关背景概念
数据不一致问题
锁
深度理解锁
原理角度理解:
实现角度理解:
线程同步
条件变量
测试代码
生产消费模型
生产消费模型概念
编写生产消费模型
BlockingQueue
(1)创建生产者,消费者
(2)新线程的执行函数
(3)BlockQueue的封装
线程互斥
进程线程间的互斥相关背景概念
- 临界资源: 多线程执行流共享的资源就叫做临界资源
- 临界区: 每个线程内部, 访问临界资源的代码, 就叫做临界区
- 互斥: 任何时刻, 互斥保证有且只有一个执行流进入临界区, 访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现) : 不会被任何调度机制打断的操作, 该操作只有两态, 要么完成, 要么未完成
数据不一致问题
多线程访问一个资源时,并发访问会出现数据不一致问题;
以抢票为例:
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <cstdlib>
using namespace std;
int num = 1000;
void *routine(void *args)
{
while (true)
{
if (num > 0)
{
cout << (char *)args << " get a ticket :" << num << endl;
num--;
}
else
{
break;
}
}
return nullptr;
}
int main()
{
pthread_t tid[5];
char *name[5];
for (int i = 0; i < 5; i++)
{
name[i] = new char[128];
snprintf(name[i], 128, "thread-%d", i + 1);
pthread_create(&tid[i], nullptr, routine, (void *)name[i]);
}
sleep(1);
for (int i = 0; i < 5; i++)
{
pthread_join(tid[i], nullptr);
cout << "thread-" << i + 1 << "join sucess ..." << endl;
delete[] name[i];
}
}
运行会发现: 只有1000个票,抢的票数却大于1000;
出现上述结果的原因:
当票只有一张的时候,线程1进入了,判断了大于0了,但是正准备去抢票的时候,线程1 被切换了;此时线程2被唤醒了,因为线程1还没有来到及对num--,所以线程2也进入了,也在正准备抢票的时候被切换了;此时又进入了线程3,线程3和线程1,2一样,也是在num--之前被切换;此时只有一张票,但是却有三个线程;线程1 被唤醒,num--,票数变成了0;线程2又被唤醒,num--;票数为-1;线程3被唤醒;num--,票数为-2;
--:不是一步能做成了,而是:1、重读数据 2、--数据 3、写回数据;
那怎么解决上述的出现的数据不一致问题呢?
加锁
锁
pthread_mutex_t:互斥锁类型
锁的定义:
加锁和解锁:
运行结果: 通过观察运行结果,确实不会出现0,-1,等问题了;
结论:
- 加锁的范围,粒度一定要尽量小;
- 任何线程,要进行抢票,都必须先申请锁;
- 所以线程申请锁,前提是所以线程都看得到这把锁,锁本身也是共享资源;--加锁的过程必须是原子的
- 原子性:要么不做,要么做完,没有中间状态;
- 如果线程申请锁失败了,我的线程就要被阻塞;
- 如果线程申请锁成功了,继续向后运行;
- 如果线程申请锁成功了,执行临界区代码,执行临界区代码期间,可以切换,但是其他线程是进不来的,因为虽然被切换了,但是并没有释放锁;
- 对于其他线程,要么我没有申请锁,要么我释放了锁,对其他线程才有意义!!!
深度理解锁
原理角度理解:
实现角度理解:
为了实现互斥锁操作,大多数体系结构都提供了 swap 或 exchange 指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
线程a申请到锁:1、寄存器al清零 2、交互寄存器al和锁中的值 3、判断(如果寄存器al中>0,说明申请到了锁,不满足条件,则没有申请到锁);
线程同步
我们以下面的例子来引入:
在一个自习室中,一次只允许一个人进入,如果一个人拿着钥匙一直进入的话就不合理了;所以管理员提出了规则:1、自习完的同学,规划钥匙后,不能立马申请;2、第二次申请,必须排队;
这样所有人访问自习室的过程,是安全的且具有一定的顺序性!---->同步---->严格的顺序性;
所以同步就是为了保护我们的顺序性;
看我们互斥时的代码允许结果,我们会发现最后都是一个线程抢到票,这样对其他线程就不公平,引入同步就能很好的解决这个问题;
条件变量
- 快速认识接口
- 认识条件变量
声明条件变量:
进行等待:
可以理解为自习室的人排队等待,在哪里等呢?在指定的条件变量下去等待;
被调用时,除了自己排队等待,还会自己释放传入的锁;
返回时,必须先参与锁的竞争,重新加上锁后,该函数才会释放;
唤醒:
唤醒一个线程:
唤醒全部线程:
测试代码
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
const int num = 5;
pthread_mutex_t mutex =PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *Wait(void *args)
{
string name = static_cast<char *>(args);
while (true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond,&mutex);//在条件变量下进行等待,这里就是进程等待的位置
cout << "I am " << name << endl;
pthread_mutex_unlock(&mutex);
// usleep(10000);
}
}
int main()
{
pthread_t tid[num];
for (int i = 0; i < num; i++)
{
char *buff = new char[1024];
snprintf(buff, 1024, "thread-%d", i + 1);
pthread_create(&tid[i], nullptr, Wait, (void *)buff);
sleep(1);
}
while(true)
{
pthread_cond_signal(&cond);
sleep(1);
}
for (int i = 0; i < num; i++)
{
pthread_join(tid[i], nullptr);
}
return 0;
}
运行结果:
生产消费模型
生产消费模型概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯, 而通过阻塞队列来进行通讯, 所以生产者生产完数据之后不用等待消费者处理, 直接扔给阻塞队列, 消费者不找生产者要数据, 而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
"321"原则:
- 1:一个交易场所(特定数据结构形式存在的一段内存空间)
- 2:两种角色(生产角色,消费角色),生成线程,消费线程
- 3:三种关系(生产和生产,消费和消费,生成和消费)
生产消费模型本质就是:通过代码,实现“321”原则,用锁和条件变量(或其他方式)来实现三种关系!!!
编写生产消费模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。 其与普通的队列区别在于, 当队列为空时, 从队列获取元素的操作将会被阻塞, 直到队列中被放入了元素; 当队列满时, 往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的, 线程在对阻塞队列进程操作时会被阻塞)
(1)创建生产者,消费者
创建生产者和消费者即创建线程:pthread_create
(2)新线程的执行函数
(3)BlockQueue的封装
#pragma once
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <queue>
#include <sys/types.h>
using namespace std;
const int defaultcap = 5;
template <typename T>
class BlockQueue
{
private:
bool isfull()
{
if (_block_queue.size() == _max_cp)
return true;
return false;
}
bool isEmpty()
{
return _block_queue.empty();
}
public:
BlockQueue(int cap = defaultcap) : _max_cp(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while (isEmpty())
{
// 空了
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _block_queue.front();
_block_queue.pop();
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_p_cond);
}
void Equeue(const T &in)
{
pthread_mutex_lock(&_mutex);
while (isfull())
{
// 满了,生产者不能生产,必须等待
// pthread_cond_wait被调用的时候,除了让自己继续排队等待,还会自己释放传入的锁
// 被唤醒的时候,会重新排队加锁
pthread_cond_wait(&_p_cond, &_mutex);
}
_block_queue.push(in);
pthread_mutex_unlock(&_mutex);
pthread_cond_signal(&_c_cond);
}
private:
queue<T> _block_queue; // 临界资源
int _max_cp;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond;
pthread_cond_t _c_cond;
};
以上就是线程互斥与同步,生产消费模型的全部内容,希望有所帮助!!!