在上一篇博客中我们已经介绍到了线程控制以及对应的函数调用接口,接下来要讲的是真正的多线程,线程安全、线程互斥、同步以及锁。
一、多线程
简单写个多线程的创建、等待的代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<vector>
using namespace std;
const int threadnum = 5;
void *handlerTask(void *args)
{
string name = static_cast<char*>(args);
while(true)
{
sleep(1);
cout<<"I am "<<name<<endl;
}
return nullptr;
}
int main()
{
vector<pthread_t> threads;
for(int i=0;i<threadnum;i++)
{
char threadname[64];
snprintf(threadname,64,"thread-%d",i+1);
pthread_t tid;
sleep(1);
pthread_create(&tid,nullptr,handlerTask,threadname);//多线程创建
threads.push_back(tid);
}
for(auto &tid:threads)
{
pthread_join(tid,nullptr);//等待
}
return 0;
}
用一下之前的接口实现多线程的抢票功能
thread.hpp
#include<iostream>
#include<signal.h>
#include<unistd.h>
#include<functional>
#include<pthread.h>
namespace Thread_Module
{
template <typename T>
using func_t = std::function<void(T&)>;
// typedef std::function<void(const T&)> func_t;
template <typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func,T &data,const std::string &threadname = "none")
:_threadname(threadname)
,_func(func)
,_data(data)
{}
static void* threadrun(void *args)//线程函数
{
Thread<T> *self = static_cast <Thread<T>*>(args);
self->Excute();
return nullptr;
}
bool Start()//线程启动!
{
int n = pthread_create(&_tid,nullptr,threadrun,this);
if(!n)//返回0说明创建成功
{
_stop = false;//说明线程正常运行
return true;
}
else
{
return false;
}
}
void Stop()
{
_stop = true;
}
void Detach()//线程分离
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()//线程等待
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
std::string threadname()//返回线程名字
{
return _threadname;
}
~Thread()
{}
private:
pthread_t _tid;//线程tid
std::string _threadname;//线程名
T &_data;//数据
func_t<T> _func;//线程函数
bool _stop; //判断线程是否停止 为true(1)停止,为false(0)正常运行
};
}
main.cc
#include <iostream>
#include <mutex>
#include "Thread.hpp"
#include<string>
#include<vector>
using namespace Thread_Module;
int g_tickets = 10000; // 共享资源,没有保护的
void route(int &tickets)
{
while (true)
{
if(tickets>0)
{
usleep(1000);
printf("get tickets: %d\n", tickets);
tickets--;
}
else
{
break;
}
}
}
const int num = 4;
int main()
{
// std::cout << "main: &tickets: " << &g_tickets << std::endl;
std::vector<Thread<int>> threads;
// 1. 创建一批线程
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads.emplace_back(route, g_tickets, name);
}
// 2. 启动 一批线程
for (auto &thread : threads)
{
thread.Start();
}
// 3. 等待一批线程
for (auto &thread : threads)
{
thread.Join();
std::cout << "wait thread done, thread is: " << thread.threadname() << std::endl;
}
return 0;
}
运行之后,可以看出抢多了票,票数怎么能抢到负数了
原因是发生了线程不安全问题
二、线程安全
抢票抢到负数造成的票数不一致本质是g_tickets是全局变量(无保护),对全局的g_tickets的判断不是原子的
为什么抢到负数?
可能是tickets已经等于1了,但是判断的时候,让多个线程进入抢票逻辑了(涉及到CPU调度),进入抢票逻辑后还执行了ticket--(等价于tickets = tickets - 1)
tickets--的本质:
- 从内存读到CPU
- CPU内部进行--操作
- 数据写回内存
tickets--操作不是原子的-----又引出一个问题------>什么是原子的?
当一条语句转成汇编代码只有一条汇编是,那么就可以说这条语句是原子的
那么如何解决这种数据不一致问题呢?
导致数据不一致的原因:共享资源没有被保护,多线程对该资源进行了交叉访问
而解决办法就是:对共享资源进行加锁
三、线程互斥(锁)
再引入几个概念
临界资源:多个执行流进行安全访问的共享资源
临界区:访问临界资源的代码就叫做临界区
线程互斥:让多个线程串行访问共享资源,任何时候只能有一个执行流在访问共享资源
加锁的本质就是把并行执行转成串行执行
原子性:对一个资源进行访问的时候要么就不做,要么就做完
当一条语句转成汇编代码只有一条汇编是,那么就可以说这条语句是原子的
3.1加锁保护
锁的相关系统调用:
锁的全局(静态)初始化:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITALIZER;
man pthread_mutex_init(初始化锁)
参数:
pthread_mutex_t *mutex: 创建的互斥锁指针
const pthread_mutexattr_t *mutexattr: 锁的属性,一般设为nullptr
返回值: 初始化成功返回0,失败返回错误码
man pthread_mutex_destroy(锁的销毁)
参数:
pthread_mutex_t *mutex: 创建的互斥锁指针
返回值: 销毁成功返回0,失败返回错误码
man pthread_mutex_lock(加锁):
参数:
pthread_mutex_t *mutex: 互斥锁指针
返回值: 加锁成功返回0,失败返回错误码
1
申请锁成功:函数就会返回,允许继续向后运行
申请锁失败:函数就会阻塞
函数调用失败:出错返回
man pthread_mutex_unlock(解锁):
参数:
pthread_mutex_t *mutex: 还是互斥锁指针
返回值: 解锁成功返回0,失败返回错误码
一般都是这么用的
//初始化
//pthread_mutex_t mutex = PTHREAD_MUTEX_INITALIZER;
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,nullptr);
//加锁
pthread_mutex_lock(&mutex);
//临界区——访问临界资源的代码写这里
//。。。
//访问完了解锁
pthread_mutex_unlock(&mutex);
3.2解决数据不一致问题
有了以上的系统调用我们对抢票代码进行修改,对其进行加锁
.hpp
#include<iostream>
#include<signal.h>
#include<unistd.h>
#include<functional>
#include<pthread.h>
namespace Thread_Module
{
template <typename T>
using func_t = std::function<void(T)>;
// typedef std::function<void(const T&)> func_t;
template <typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
Thread(func_t<T> func,T data,const std::string &threadname = "none")
:_threadname(threadname)
,_func(func)
,_data(data)
{}
static void* threadrun(void *args)//线程函数
{
Thread<T> *self = static_cast <Thread<T>*>(args);
self->Excute();
return nullptr;
}
bool Start()//线程启动!
{
int n = pthread_create(&_tid,nullptr,threadrun,this);
if(!n)//返回0说明创建成功
{
_stop = false;//说明线程正常运行
return true;
}
else
{
return false;
}
}
void Stop()
{
_stop = true;
}
void Detach()//线程分离
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()//线程等待
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
std::string threadname()//返回线程名字
{
return _threadname;
}
~Thread()
{}
private:
pthread_t _tid;//线程tid
std::string _threadname;//线程名
T _data;//数据
func_t<T> _func;//线程函数
bool _stop; //判断线程是否停止 为true(1)停止,为false(0)正常运行
};
}
.cc
class ThreadData
{
public:
ThreadData(int &tickets,const std::string &name,pthread_mutex_t &mutex)
:_tickets(tickets)
,_name(name)
,_total(0)
,_mutex(mutex)
{}
~ThreadData()
{}
public:
int &_tickets;//总票数的引用
std::string _name;
int _total;//线程单独抢的票数
//std::mutex &_mutex;
pthread_mutex_t &_mutex;
};
int g_tickets = 10000;//总票数
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
void scramble(ThreadData *td)//抢票
{
while(true)
{
//pthread_mutex_lock(&gmutex);
pthread_mutex_lock(&td->_mutex);
if(td->_tickets > 0)
{
usleep(1000);
std::cout<<"线程:"<<td->_name.c_str()<<"正在抢票中..."<<" 抢到的票数是:"<<td->_total<<std::endl;
td->_tickets--;
// pthread_mutex_unlock(&gmutex);
pthread_mutex_unlock(&(td->_mutex));
td->_total++;
}
else
{
pthread_mutex_unlock(&(td->_mutex));
//pthread_mutex_unlock(&gmutex);
break;
}
}
}
const int num = 3;
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,nullptr);
std::vector<Thread<ThreadData*>> threads;
std::vector<ThreadData *> datas;
//创建一些线程
for(int i=0;i<num;i++)
{
std::string name = "thread - "+std::to_string(i+1);
ThreadData *td = new ThreadData(g_tickets,name,mutex);
threads.emplace_back(scramble,td,name);
datas.emplace_back(td);
}
//启动全部线程
for(auto &thread:threads)
{
thread.Start();
}
//等待线程
for(auto &thread:threads)
{
thread.Join();
}
for(auto data:datas)
{
std::cout<<"线程名字:"<<data->_name<<"以及此线程总共抢的票数:"<<data->_total<<std::endl;
delete data;
}
pthread_mutex_destroy(&mutex);
return 0;
}
此时票数正好10000,结果正常
需要注意的是:
当一个线程从临界区出来并且释放锁后,执行后续任务这段时间,其它线程才有更大的概率去竞争锁
加锁时,要保证临界区的粒度非常小,那些不是必须放临界区内的代码就别放里面
3.3互斥
锁的封装
#include<iostream>
#include<pthread.h>
class LockGuard
{
public:
LockGuard( pthread_mutex_t *mutex)
:_mutex(mutex)
{
pthread_mutex_lock(_mutex);
}
~LockGuard()
{
pthread_mutex_unlock(_mutex);
}
private:
pthread_mutex_t *_mutex;
}
线程互斥:让多个线程以串行方式访问一段临界区代码
之前使用锁的代码,这个锁必须被所有线程所看见,所以对于锁本身而言也是一个共享资源,那么谁来保证锁的安全性呢?通过加锁解锁都是原子的来保证锁自身的安全性
互斥到底是啥?
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下
交换(xchgb)的本质:不是拷贝到寄存器,而是所有线程在竞争锁的时候只有一个1(这个1就是锁)
加锁过程中,xchgb是原子的,可以保证锁的安全
锁只能被一个线程所持有,而且由于xchgb汇编只有一条指令,即使申请锁的过程时CPU进行调度线程被切换也不影响。一旦一个线程通过交换那到了锁,它走的时候也就把锁带走了,其它线程只能等他用完把锁还回来。
CPU寄存器硬件只有一套,但是CPU寄存器内部的数据,就是线程的硬件上下文
数据在内存中,所有线程都能访问,属于共享的;但是如果转移到CPU内部寄存器中,就属于一个线程独有了
mutex简单理解就是一个0/1的计数器,用于标记资源访问状态:
- 0表示已经有执行流加锁成功,资源处于不可访问,
- 1表示未加锁,资源可访问。
但是还有另外的问题:加锁,线程竞争锁是自由竞争的,竞争能力强的线程会导致其他线程抢不到锁,访问不了临界资源导致其他线程一直阻塞,造成其它线程的饥饿问题