//blockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gcap):_cap(cap)//初始化阻塞队列的容量
{
pthread_mutex_init(&_mutex, nullptr);//对所进行初始化
pthread_cond_init(&_consumerCond, nullptr);//对条件变量进行初始化
pthread_cond_init(&_productorCond, nullptr);
}
bool isFull(){ return _q.size() == _cap; }
bool isEmpty() { return _q.empty(); }
void push(const T &in)
{
//向队列中push数据的第一件事是向整个队列中进行加锁工作,防止我打算生产的时候有人正在消费
pthread_mutex_lock(&_mutex);
// 细节1:一定要保证,在任何时候,都是符合条件,才进行生产
if(isFull()) // 1. 我们只能在临界区内部,判断临界资源是否就绪!因为临界资源是被多线程共享的,所以判断临界资源(本身也是对临界资源的一种访问)是否就绪就需要在临界区中(也就是加锁和解锁之间)进行,注定了我们在当前一定是持有锁的!
{
// 2. 要让线程进行休眠等待,不能持有锁等待!
// 3. 注定了,pthread_cond_wait要有锁的释放的能力!
pthread_cond_wait(&_productorCond, &_mutex); // 当前是满的,就不能够再生产了,于是再自己的条件变量下进行休眠,wait函数一执行就会释放锁。我休眠了(可以理解为线程被切换),我醒来的时候,在哪里往后执行呢?
// 4. 当线程醒来的时候,注定了继续从临界区内部继续运行!因为我是在临界区被切走的!
// 5. 注定了当线程被唤醒的时候,继续在pthread_cond_wait函数处向后运行,又要重新申请锁,申请成功才会彻底返回
}
// 队列没有满,就要让他进行生产
_q.push(in);//执行完该行代码可以保证队列为非空
// 加策略
//if(_q.size() >= _cap/2)
pthread_cond_signal(&_consumerCond);//生产者唤醒消费者,但是此时消费者是否休眠不得而知,如果此时消费者没有被wait休眠则该行代码相当于什么都没有做,被丢弃掉。
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_consumerCond);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
if(isEmpty())
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
*out = _q.front();
_q.pop();//执行完该行代码至少可以保证队列没有满
// 加策略
pthread_cond_signal(&_productorCond);//消费者唤醒生产者,唤醒行为可以放在释放锁前面或者后面都可以
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);//销毁锁
pthread_cond_destroy(&_consumerCond);//销毁条件变量
pthread_cond_destroy(&_productorCond);
}
private:
std::queue<T> _q;
int _cap;//队列的容量上限
// 为什么我们的这份代码,只用一把锁呢,根本原因在于,
// 我们生产和消费访问的是同一个queue&&queue被当做整体使用!
pthread_mutex_t _mutex; //不论是生产者还是消费者,两个访问的都是同一个队列,所以要避免两个访问同一份资源,为了防止有人向队列放数据的同时又有人像队列中拿数据,在这里定义一个锁。
pthread_cond_t _consumerCond; // 消费者对应的条件变量,空,wait,防止消费者加锁发现队列为空释放锁然后继续加锁···,这种不断地循环造成的浪费。
pthread_cond_t _productorCond; // 生产者对应的条件变量,满,wait
};//main.cc
#include "blockQueue.hpp"
#include "task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *consumer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);//休眠1s导致我们执行代码的时候队列一下子就被生产满了吗,然后就是消费一个生产一个。
int data = 0;
// 1. 将数据从blockqueue中获取 -- 获取到了数据
bq->pop(&data);
// 2. 结合某种业务逻辑,处理数据! -- TODO
std::cout << " consumer data: " << data << std::endl;
}
}
void *productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
// sleep(1);
// 1. 先通过某种渠道获取数据
int data = rand() % 10 + 1;
// 2. 将数据推送到blockqueue -- 完成生产过程
bq->push(data);
std::cout << " productor Task: " << data << std::endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
BlockQueue<int> *bq = new BlockQueue<int>();
// 单生产和单消费(只需要维护一种关系也就是生产者和消费者之间的关系) -> 多生产和多消费(维护三种关系)
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
线程是一个执行分支,执行粒度要比进程更细(线程执行的是进程的一部分),调度成本更低(不用再进行对cache的切换)。线程是进程内部的一个执行流,就是线程在进程的地址空间内运行,所以线程隶属于该进程。线程是cpu调度的基本单位,进程是承担分配系统资源的基本实体。
一个进程在系统中创建是有自己的task_struct的,该结构下面有一个指针变量指向该进程的地址空间,如果该进程创建一个子进程,也会创建一个pcb并且里面存在一个变量指向该子进程的地址空间。现在有一种创建进程的方法只创建pcb,这些pcb不再单独创建自己的页表、地址空间、加载自己的代码和数据,而是让创建的多个pcb指向父进程pcb的地址空间,未来让创建的这些pcb执行父进程代码区当中的不同区域的代码,这些pcb执行的是同一个进程内的不同代码区域,而在进程内创建的多个pcb单独叫做线程。父进程的代码就由以前的串行执行变成现在多个执行流进行并发执行。现在进程升级为包括一大批的执行流(pcb)、地址空间、页表、以及该进程对应的代码和数据,进程成为了承担分配系统资源的基本实体,我们创建进程时一定是从0开始创建的,一定要重新至少创建一个pcb、一个地址空间、一个页表以及将在到内存的代码和数据必须也要有,这一整套机制我们必须在系统层面上申请内存资源,包括整个进程再宏观上也要申请cpu资源,而线程的资源是伸手向进程要的。
Linux复用了PCB的结构体,用PCB 模拟线程的TCB不就行了,很好的复用了进程的设计方案。Linux没有真正意义上的线程,而是用进程方案模拟的线程。Linux操作系统看到pcb可能是一个单执行流的进程也可能是一个包含多个执行流的进程中的一个线程。LWP是轻量级线程id,os调度的时候,使用LWP来识别线程的,而对于单执行流的进程pid和LWP是一样的。
void *thread1_run(void *args)
{
while(1)
{
printf("我是线程1, 我正在运行\n");
sleep(1);
}
}
void *thread2_run(void *args)
{
while(1)
{
printf("我是线程2, 我正在运行\n");
sleep(1);
}
}
void *thread3_run(void *args)
{
while(1)
{
printf("我是线程3, 我正在运行\n");
sleep(1);
}
}
int main()
{
pthread_t t1, t2,t3;
pthread_create(&t1, NULL, thread1_run, NULL);
pthread_create(&t2, NULL, thread2_run, NULL);
pthread_create(&t3, NULL, thread3_run, NULL);
while(1)
{
printf("我是主线程, 我正在运行\n");
sleep(1);
}
return 0;
}
上面三个线程的pid是一样的,但是线程的LWP也就是轻量级线程id是不一样的。
虚拟地址空间的基本单位是字节,所以我们虚拟地址空间会有2^32个字节,而我们要把虚拟地址映射到内存,通过页表来维护,而页表也需要占据内存空间。操作系统在和磁盘这样的设备进行交互的时候绝对不是按照字节为单位而是按照块4kb(这是文件系统给的要求,和具体的os有关)为单位进行io交互的。即便修改的是磁盘的一个比特位,也要求os和磁盘进行4KB的交互。磁盘存储文件也是按照块存储的。内存实际在进行内存管理的时候,也要是4KB位单位。物理内存实际在进行内存管理的时候也要是以4kb为单位的块进行管理,内存管理的本质就是将磁盘中特定的4kb的块(数据内容)放入到物理内存中的4kb的空间(数据保存的空间)。 物理内存是靠什么划分成4KB大小的或者说物理内存中的页page是如何被管理的?是靠操作系统的内核page数据结构来进行管理的,这个结构体的属性非常的少,比如page状态(位图结构,当前是否被占用),采用struct page mem[1048576] ,这就将内存的管理转换为对数组的数据结构进行管理,这部分内核数据结构将会占用内存4-5MB的空间。局部性原理的特征(现代计算机预加载数据的理论基础):允许我们提前加载正在访问数据的相邻或者附近的数据,我们会通过预先加载要访问的数据的附近的数据来减少未来的IO次数,多加载进来的数据本质就叫做数据的预加载。
为什么是4KB呢?IO的基本单位i(内核内存+文件系统)都要提供支持,通过局部原理预测未来的命中情况,提高效率。虚拟地址空间也并不是全部都要用的,也就是从32个0到32个1,而是只需要采用前20个比特位通过页表映射找到页框地址(可以看成是struct page mem[1048576]的一个下标),后12个比特位是记录的一个偏移量[0,4095]刚好对应的是4096个字节也就是4KB的大小,这是不需要页表索引的,虚拟地址的值是对少对应的就是多少个字节的偏移量。
任何一个对象或者变量,可能存在多个字节,但是你会发现你永远取地址的时候,只会拿到一个地址量,汇编过程将会理解成位该变量的其实地址+数据类型(偏移量)。采用的也是基地址+偏移量的机制。
我们实际在申请内存的时候,os只要在虚拟地址空间上申请就行了。当我真正访问的时候,执行的是我的代码,mmu会发现这个地址在虚拟空间上已经申请给我了,但是在物理内存中却没有对应的映射匹配,此时就会发生缺页中断,os系统执行对用中断的处理方法,也就是os才会自动给你申请或者填充页表+申请具体的物理内存。
int g_val = 0; // 全局变量,在多线程场景中,验证我们多个线程看到的是同一个变量! --因为多线程共享地址空间
void *threadRun1(void *args)
{
while (true)
{
sleep(1);
cout << "t1 thread..." << getpid() << " &g_val: " << &g_val << " , g_val: " << g_val << endl;//线程1打印全局变量
}
}
void *threadRun2(void *args)
{
char *s = "hello bit";
while (true)
{
sleep(1);
cout << "t2 thread..." << getpid() << " &g_val: " << &g_val << " , g_val: " << g_val++ << endl;//线程2对全局变量做了修改
if(g_val==5) *s = 'H'; // 让这一个线程崩溃
}
}
int main()
{
pthread_t t1, t2;
pthread_create(&t1, nullptr, threadRun1, nullptr);//创建线程他t1,执行函数threadRun1
pthread_create(&t1, nullptr, threadRun2, nullptr);
while (true)//主线程执行循环
{
sleep(1);
cout << "main thread..." << getpid() << " &g_val: " << &g_val << " , g_val: " << g_val << endl;
}
return 0;
}
现象:在多线程程序当中,任何一个线程崩溃了都会到导致进程奔溃。为什么呢?系统角度:因为线程是进程的执行分支,线程干了就是进程干了。信号角度:页表转换的时候,MMU识别写入权限的,没有验证通过,MMU异常->被操做系统识别,给进程发送信号。 因为执行流看到的资源是通过地址空间看到的,多个LWP看到的是同一个地址空间,所以所有的线程都可能会共享进程的大部分资源。
线程vs进程
线程共享进程数据,但是也拥有自己的一部分数据:一组寄存器(保存上下文数据,因为线程是要被切换的)、栈 、线程、id、调度优先级、errno、信号屏蔽字。
Linux下没有真正意义的线程,而是用进程模拟的线程(LWP)。所以,Linux操作系统不会提供直接创建线程的系统调用,他顶多给我们提供创建轻量级线程的接口。 而从用户的视角只认线程。这就需要中介用户级线程库pthread.h(不改变用户的行为也不改变操作系统的特性)对下将Linux接口封装,对上给用户提供进行线程控制的接口,任何linux操作系统都要自带。
// 线程终止
// 1. 线程函数执行完毕,也就是再函数内补return nullptr
// 2. pthread_exit(void*)
void *thread_run(void *args)
{
char *name = (char*)args;
while (true)
{
cout << "new thread is running, my thread is:" << name << endl;
exit(10); // exit是进程退出,不是线程退出,只要有任何一个线程调用exit,整个进程(所有线程)全部退出!优化就是将这里改写成为pthread_exit(nullptr)
sleep(4);
}
delete name;
return nullptr;
}
#define NUM 10
int main()
{
pthread_t tids[NUM];
for(int i = 0; i < NUM ;i++)
{
char *tname = new char[64];
snprintf(tname, 64, "thread-%d", i+1);
pthread_create(tids+i, nullptr, thread_run, tname);//此时会有多个执行流重入thread_run函数,所以要求thread_run是可重入的。
}
//return 0;如果主线程提前return,而此时正处于循环的所有子线程也会退出。主线程退出可以理解为进程退出,进程退出所有的资源和代码就会被释放,所以其它子线程也就退出了。
//如果一个新线程被创建出来,它是需要被主线程等待的。如果不等待会导致类似于僵尸线程的问题。主线程需要等待新线程并在其退出之后获取新线程的退出码,并且释放新线程退出之后的僵尸状态。
//这样就可以很好的保证是新线程退出之后主线程才退出。
for(int i = 0; i < NUM; i++)
{
int n = pthread_join(tids[i], nullptr);//阻塞等待线程退出
if(n != 0) cerr << "pthread_join error" << endl;//返回值不为零就是等待出错,对线程一般很少采用errno去判断错误的原因,因为无法确定是哪个线程造成的。
}
cout << "all thread quit..." << endl;
return 0;
}
之所以上述线程的名字都是相同的,原因是线程传递的参数并非是tname缓冲区本身而是该缓冲区的起始地址,主线程中tname缓冲区会被被覆盖。
void *thread_run(void *args) { char *name = (char*)args; while (true) { cout << "new thread is running, my thread is:" << name << endl; sleep(4); break; } delete name; pthread_exit((void*)1); } #define NUM 10 int main() { pthread_t tids[NUM]; for(int i = 0; i < NUM ;i++) { char *tname = new char[64]; snprintf(tname, 64, "thread-%d", i+1); pthread_create(tids+i, nullptr, thread_run, tname);//此时会有多个执行流重入thread_run函数,所以要求thread_run是可重入的。 } void * ret = nullptr;//该变量是用来放子线程所执行的函数thread_run的返回值 for(int i = 0; i < NUM; i++) { int n = pthread_join(tids[i], &ret);//int pthread_join(pthread_t thread, void **retval),阻塞等待线程退出,第二个参数是用来存放返回值的。 //pthread_join之所以只接收线程的返回值而不需要考虑线程异常的情况呢?因为一个线程出现异常了,整个进程都会被杀掉,一旦线程异常了,有可能都等不到pthread_join函数拿到线程的返回值整个进程就被干掉了。所以线程不需要考虑异常问题,异常问题是进程需要考虑的。 if(n != 0) cerr << "pthread_join error" << endl;//返回值不为零就是等待出错,对线程一般很少采用errno去判断错误的原因,因为无法确定是哪个线程造成的。 cout << "thread quit: " << (uint64_t)ret << endl; } cout << "all thread quit..." << endl; return 0; }
优化一下上面的代码(线程创建传参以及接收返回值的时候的void*参数不一定只是简单的字符串或者整数,还可以传递类对象,再类内部定义属性和方法,从而让我们的线程执行不同的任务。
//主线程创建子线程传参的时候不一定只能是字符串,还可以传递class对象。因为线程的通信成本并不高。 enum{ OK=0, ERROR }; class ThreadData { public: ThreadData(const string &name, int id, time_t createTime, int top) :_name(name), _id(id), _createTime((uint64_t)createTime),_status(OK), _top(top), _result(0) {} ~ThreadData() {} public: // 输入的 string _name; int _id; uint64_t _createTime; // 返回的,也可以定义一个线程返回的类ResuleData int _status; int _top; int _result; }; //该线程执行的任务是接收主线程传递的类对象,读取其中的top变量,计算从1到top累加的结果。 void *thread_run(void *args) { ThreadData *td = static_cast<ThreadData *>(args);//这也是一种类型强转的写法 cout << "thread is running, name " << td->_name << " create time: " << td->_createTime << " index: " << td->_id << endl; for(int i = 1; i <= td->_top; i++) { td->_result += i; } cout << td->_name << " cal done!" << endl; pthread_exit(td);//返回的时候的指针指向的也是一个类,采用return td的效果也是一样的 } #define NUM 10 int main() { pthread_t tids[NUM]; //创建线程并且分配任务 for(int i = 0; i < NUM ;i++) { char tname[64]; snprintf(tname, 64, "thread-%d", i+1); ThreadData *td = new ThreadData(tname, i+1, time(nullptr), 100+5*i); pthread_create(tids+i, nullptr, thread_run, td);//td是主线程传给子线程的一个地址,该地址指向的变量不一定是字符串,此处指向的是class类型,void*可以接收任何类型的指针。 sleep(1);//让线程创建的时间点都不一样 } void * ret = nullptr; //接收每一个线程的运行结果 for(int i = 0; i < NUM; i++) { int n = pthread_join(tids[i], &ret); if(n != 0) cerr << "pthread_join error" << endl; ThreadData *td = static_cast<ThreadData *>(ret); if(td->_status == OK) { cout << td->_name << " 计算的结果是: " << td->_result << " (它要计算的是[1, " << td->_top << "])" <<endl; } delete td;//释放空间 } cout << "all thread quit..." << endl; return 0; }
//采用给线程发送取消请求来取消一个线程 void *threadRun(void* args) { const char*name = static_cast<const char *>(args);//类型转换的另一种表述形式,相当于const char*name = (const char *)args; int cnt = 5; while(cnt) { cout << name << " is running: " << cnt-- << " obtain self id: " << pthread_self() << endl;//一个线程可以获得自己的线程id,采用的函数就是pthread_self(),返回自己的线程id。 sleep(1); } pthread_exit((void*)11); //如果一个线程是被取消的,该线程的退出结果就是PTHREAD_CANCELED,本质就是-1。因为#define PTHREAD_CANCELED ((void *) -1) } int main() { pthread_t tid; pthread_create(&tid, nullptr, threadRun, (void*)"thread 1"); sleep(3); pthread_cancel(tid);//给线程发送取消请求 void *ret = nullptr; pthread_join(tid, &ret);//该线程等待属于阻塞等待,主线程现在无法做其它的事情。但是线程必须要被等待回收,不然会有僵尸线程。 cout << " new thread exit : " << (int64_t)ret << "quit thread: " << tid << endl;//(int64_t)ret如果写成(int)ret会报错,因为ret是指针类型在当前环境下是8个字节,如果强转称为int类型的话是4个字节,报错存在精度损失。 return 0; }
分离线程
一个线程在创立的时候是默认可以joinable的,一个线程如果被分离了,就无法再被join,函数会报错。
void *threadRoutine(void* args) { // string name = static_cast<const char*>(args); int cnt = 5; while(cnt) { cout << name << " : " << cnt-- << endl; sleep(1); } return nullptr; } int main() { pthread_t tid; pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1"); pthread_detach(tid);//主线程主动分离 int n = pthread_join(tid,nullptr); if( 0 != n ) { std::cerr << "error: " << n << " : " << strerror(n) << std::endl; } sleep(10); return 0; }
主线程join报错是主线程的事情,子线程已经被分离了,子线程对于主线程是不关心的,继续自己跑自己的。void *threadRoutine(void* args) { pthread_detach(pthread_self());//子线程自己分离 string name = static_cast<const char*>(args); int cnt = 5; while(cnt) { cout << name << " : " << cnt-- << endl; sleep(1); } return nullptr; } int main() { pthread_t tid; pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1");//线程被创建的时候,谁先执行不确定。 int n = pthread_join(tid,nullptr);//此时如果子线程还没有执行分离就执行该代码,会导致主线程查看子线程的joinable的属性时目前是可以join的,此时主线程就会挂起阻塞等待子线程退出。 if( 0 != n ) { std::cerr << "error: " << n << " : " << strerror(n) << std::endl; } return 0; }
所以一般分离函数建议写在主线程当中,否则会导致上述子线程已经分离了,但是主线程自己却还不知道。
一般线程分离之后主线程就不用join了,主线程自己去做自己的事情就可以了。
Linux内核没有真正意义上的进程,而是轻量化进程lwp。为了更好的管理的lwp,pthread库就充当了一个中间媒介将lwp封装成了线程,对线程做管理。并且有了管理线程的数据结构TCB。我们就可以拿着在库当中定义的线程控制结构TCB把内核的LWP管理起来。而这些TCB的位置就在内存中pthread库当中存放着。
std::string hexAddr(pthread_t tid) { char buffer[64]; snprintf(buffer, sizeof(buffer), "0x%x", tid);//%x是以十六进制的形式输出整数 return buffer; } void *threadRoutine(void* args) { string name = static_cast<const char*>(args); int cnt = 5; while(cnt) { cout << name << " : " << cnt-- << ":" << hexAddr(pthread_self()) << endl; sleep(1); } return nullptr; } int main() { pthread_t tid; pthread_create(&tid, nullptr, threadRoutine, (void*)"thread 1"); int n = pthread_join(tid,nullptr); while(true) { cout << " main thread: " << hexAddr(pthread_self()) << " new thread id: " << hexAddr(tid) << endl; sleep(1); } if( 0 != n ) { std::cerr << "error: " << n << " : " << strerror(n) << std::endl; } return 0; }
平时我们拿到的线程id其实是库当中该线程对应属性集的起始地址
所有线程都要有自己独立的栈结构,主线程用的是进程系统栈,新线程用的是库中提供的栈。对于任何语言,只要在Linux下面跑和线程相关的代码,底层都要封装原生线程库,语言层面的接口才具有跨平台性,这里的原生线程库只能在Linux下跑,但是如果是语言层面的接口,在编译的时候会进行条件编译,根据所处的平台调用选择不同创建线程的方式,链入不同的库。int g_val = 100; __thread int pthread_g_val = 100;//__thread这是属于gcc或者g++的一个编译选项,此时可以让全局变量pthread_g_val不被所有线程共享, 可以理解为将全局变量pthread_g_val给每一个线程拷贝一个,构建每个线程的局部存储。 //这相当定义了一个只属于某一个线程的数据或变量,可以理解为属于某一个线程的全部变量,比如说统计线程中的某一个函数被调度了多少次。不用局部变量的原因是因为局部变量一用就释放了,而全局变量又会被所有线程共 用。 std::string hexAddr(pthread_t tid) { //pthread_g_val++;//线程局部存储的用法:统计线程中某个函数的调用次数 char buffer[64]; snprintf(buffer, sizeof(buffer), "0x%x", tid);//%x是以十六进制的形式输出整数 return buffer; } void *threadRoutine(void* args) { string name = static_cast<const char*>(args); int cnt = 3;//每一个线程都会创建一个cnt的变量,cnt变量是每个线程私有的,在线程栈当中。 while(cnt) { cout << name << " : " << cnt-- << " : " << hexAddr(pthread_self()) << " &cnt: " << &cnt << endl; cout << name << " g_val: " << g_val++ << ", &g_val: " << &g_val << endl;//不同线程对全局变量做操作,操作的都是同一个全局变量,通过取地址可以得出。因为全局变量在已初始化数据段,不属于线程私有的数据,是被所有线程共享的。 cout << name << " pthread_g_val: " << pthread_g_val++ << ", &pthread_g_val: " << &pthread_g_val << endl; sleep(2); } return nullptr; } int main() { pthread_t t1, t2, t3; pthread_create(&t1, nullptr, threadRoutine, (void*)"thread 1"); // 线程被创建的时候,谁先执行不确定! pthread_create(&t2, nullptr, threadRoutine, (void*)"thread 2"); // 线程被创建的时候,谁先执行不确定! pthread_create(&t3, nullptr, threadRoutine, (void*)"thread 3"); // 线程被创建的时候,谁先执行不确定! pthread_join(t1, nullptr); pthread_join(t2, nullptr); pthread_join(t3, nullptr); return 0; }
互斥与同步
g_val=100(这是一个全局变量);在c语言中g_val--这一句代码会被拆解成为至少3条机器指令:1、将数据从内存move到cpu的寄存器当中;2、cpu对寄存器当中的数据做减减操作;3、将cpu当中寄存器的数据重新写入到内存当中。
对全局变量做--,没有保护的话,会存在并发访问的问题,进而导致数据不一致问题。
int g_val = 100;----共享资源
未来我们对共享资源进行一定的保护,这种共享资源称之为临界资源---衡量共享资源的
我们的任何一个线程,都有代码访问临界资源(临界区),同样的不访问临界资源的区域(非临界区)---很亮线程的代码
我们想让多个线程安全的访问临界资源,加锁、互斥访问。这就让g_val--对应的三条指令看起来就像一条指令---原子性。
//模拟多线程抢票业务 int tickets = 10000; // 这就是临界资源 void *threadRoutine(void *name) { string tname = static_cast<const char*>(name); while(true) { if(tickets > 0) // 此时如果tickets == 1,在该情况下线程a进入到该判断语句然后usleep,在线程ausleep期间(或者线程a还没有执行减减操作就因为时间片到了而被挂起来了),可能线程b也会进入因为此时的tickets还是为1满足该判断条件的,这就导致一个tichet==1这个情况下有多个线程进入到该判断语句,从而导致每一个线程都要做减减的操作。这时候ticket就从1变成0再变成-1。 { usleep(2000); // 模拟抢票花费的时间 cout << tname << " get a ticket: " << tickets-- << endl; } else { break; } } return nullptr; } int main() { pthread_t t[4]; int n = sizeof(t)/sizeof(t[0]); for(int i = 0; i < n; i++) { char *data = new char[64]; snprintf(data, 64, "thread-%d", i+1); pthread_create(t+i, nullptr, threadRoutine, data); } for(int i = 0; i < n; i++) { pthread_join(t[i], nullptr); } return 0; }
由于没多线程之间并非互斥,导致tickets出现负数的情况,优化代码加上锁。int tickets = 1000; // 加锁保证共享资源的安全 pthread_mutex_t mutex; // 先申请锁,全局锁和局部锁(比如定义再main函数的栈上)都可以采用这种方式,但是再使用期间要进行初始化使用之后要将进行销毁,当前该锁处于全局变量区域。 //pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 这是定义锁的另外一种方式,这种方式定义的好处是不需要再初始化以及用完了之后也不需要销毁,但是该锁变量必须属于全局变量。 void *threadRoutine(void *name) { string tname = static_cast<const char*>(name); while(true) { pthread_mutex_lock(&mutex); // 对mutex进行加锁操作,一旦加锁成功就会进入到临界区当中访问临界区代码,如果加锁失败没有锁,就会将当前执行流阻塞住,也就是该执行需等待一下。所有线程都要遵守这个规则 if(tickets > 0) { //a,b,c,d usleep(2000); // 模拟抢票花费的时间 cout << tname << " get a ticket: " << tickets-- << endl; pthread_mutex_unlock(&mutex);//对mutex进行解锁 } else { pthread_mutex_unlock(&mutex); break; } // 后面还有动作 usleep(1000); //充当抢完一张票,后续动作 } return nullptr; } int main() { pthread_mutex_init(&mutex, nullptr);//所在使用之前要进行初始化的操作 pthread_t t[4]; int n = sizeof(t)/sizeof(t[0]); for(int i = 0; i < n; i++) { char *data = new char[64]; snprintf(data, 64, "thread-%d", i+1); pthread_create(t+i, nullptr, threadRoutine, data); } for(int i = 0; i < n; i++) { pthread_join(t[i], nullptr); } pthread_mutex_destroy(&mutex);//锁不需要了要进行销毁 return 0; }
互斥量的实现原理
//和上面的代码没什么太多的差别,只是多定义了一个类TData用来再创建线程的时候传递信息 int tickets = 1000; // 全局变量,共享对象 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 这是我在外边定义的锁 class TData//此时由于锁定义为局部变量,为了让线程可以看到锁,我们定义以个类,方便再线程创建的过程当中进行传递信息。 { public: TData(const string &name, pthread_mutex_t *mutex):_name(name), _pmutex(mutex) {} ~TData() {} public: string _name; pthread_mutex_t *_pmutex; }; void* threadRoutine(void *args) { TData *td = static_cast<TData*>(args); while (true) { pthread_mutex_lock(td->_pmutex); // 加锁,是一个让不让你通过的策略 { if (tickets > 0) { usleep(2000); cout << td->_name << " get a ticket: " << tickets-- << endl; // 临界区 pthread_mutex_unlock(td->_pmutex); } else { pthread_mutex_unlock(td->_pmutex); break; } } // 我们抢完一张票的时候,我们还要有后续的动作 usleep(13); } } int main() { pthread_mutex_t mutex; pthread_mutex_init(&mutex, nullptr); pthread_t tids[4]; int n = sizeof(tids)/sizeof(tids[0]); for(int i = 0; i < n; i++) { char name[64]; snprintf(name, 64, "thread-%d", i+1); TData *td = new TData(name, &mutex); pthread_create(tids+i, nullptr, threadRoutine, td); } for(int i = 0; i < n; i++) { pthread_join(tids[i], nullptr); } pthread_mutex_destroy(&mutex); return 0; }
细节:
1. 凡是访问同一个临界资源的线程,都要进行加锁保护,而且必须加同一把锁,这个是一个游戏规则,不能有例外
2. 每一个线程访问临界区之前,得加锁,加锁本质是给 临界区 加锁,加锁的粒度尽量要细一些
3. 线程访问临界区的时候,需要先加锁->所有线程都必须要先看到同一把锁->锁本身就是公共资源->锁来保护公共资源tickets的安全,锁自己做为公共资源如何保证自己的安全?-> 加锁和解锁本身就是原子的!并不涉及对锁资源竞争的问题,要么不加锁要么加锁成功。
4. 临界区可以是一行代码,可以是一批代码,a. 线程可能被切换吗?当然可能, 不要特殊化加锁和解锁,还有临界区代码,只不过加锁解锁工作可以保证多执行流进行串行访问。
b. 切换会有影响吗?不会,因为在我不在期间,任何人都没有办法进入临界区,因为他无法成功的申请到锁!因为锁被我拿走了!
5. 这也正是体现互斥带来的串行化的表现,站在其他线程的角度,对其他线程有意义的状态就是:锁被我申请(持有锁),锁被我释放了(不持有锁), 原子性就体现在这里
6. 解锁的过程也被设计成为原子的!
7. 锁 的 原理的理解
线程向寄存器写入内容,未来线程切换走的时候,寄存器的内容做为该线程执行流的上下文是要被带走的,再下次切换到该线程时,线程会对上次执行流上下文的数据进行还原,所以寄存器硬件只有一套但是寄存器的内部数据是每一个线程都要有的。
demo版本线程的封装
对线程进行封装,底层还是要调用pthread_create函数接口
//Thread.hpp #pragma once #include <iostream> #include <string> #include <cstdlib> #include <pthread.h> class Thread { public: typedef enum { NEW = 0, RUNNING, EXITED } ThreadStatus;//表示线程的状态 typedef void (*func_t)(void *); public: Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args) { char name[128]; snprintf(name, sizeof(name), "thread-%d", num); _name = name; } int status() { return _status; } std::string threadname() { return _name; } pthread_t threadid() { if (_status == RUNNING) return _tid; else { return 0; } } // 类的成员函数,具有默认参数this,需要static,这时没有所谓的this指针,就可以和pthread_create的第三个参数匹配 // 但是会有新的问题:static成员函数,因为没有this指针导致无法直接访问类属性和其他成员函数 static void *runHelper(void *args) { Thread *ts = (Thread*)args; // args接受的时this指针,就拿到了当前对象 // _func(_args); (*ts)(); return nullptr; } void operator ()() //仿函数 { if(_func != nullptr) _func(_args); } void run() { int n = pthread_create(&_tid, nullptr, runHelper, this);//传递this指针是为了让函数runHelper可以拿到this对象 if(n != 0) exit(1); _status = RUNNING; } void join() { int n = pthread_join(_tid, nullptr); if( n != 0) { std::cerr << "main thread join thread " << _name << " error" << std::endl; return; } _status = EXITED; } ~Thread() { } private: pthread_t _tid;//线程id std::string _name;//线程名字 func_t _func; // 线程未来要执行的回调 void *_args;// ThreadStatus _status; }; //main函数 #include <iostream> #include <string> #include <unistd.h> #include "Thread.hpp" #include "lockGuard.hpp" using namespace std; void threadRun(void *args) { std::string message = static_cast<const char *>(args); int cnt = 10; while (cnt) { cout << "我是一个线程, " << message << ", cnt: " << cnt-- << endl; sleep(1); } } int main() { Thread t1(1, threadRun, (void *)"hellowjj"); cout << "thread name: " << t1.threadname() << " thread id: " << t1.threadid() << ",thread status: " << t1.status() << std::endl; t1.run(); cout << "thread name: " << t1.threadname() << " thread id: " << t1.threadid() << ",thread status: " << t1.status() << std::endl; t1.join(); cout << "thread name: " << t1.threadname() << " thread id: " << t1.threadid() << ",thread status: " << t1.status() << std::endl; return 0; }
demo版锁的封装
有了锁的封装之后,我们以后对于想要对某部风代码进行加锁就会变得非常得简单。如下://lockGuard.hpp #pragma once #include <iostream> #include <pthread.h> class Mutex // 自己不维护锁,有外部传入 { public: Mutex(pthread_mutex_t *mutex):_pmutex(mutex)// 我们的Mutex类 不会定义锁,而是有我们将定义好的锁传进来。 {} void lock() { pthread_mutex_lock(_pmutex); } void unlock() { pthread_mutex_unlock(_pmutex); } ~Mutex() {} private: pthread_mutex_t *_pmutex; }; class LockGuard // 自己不维护锁,有外部传入 { public: LockGuard(pthread_mutex_t *mutex):_mutex(mutex) { _mutex.lock(); } ~LockGuard() { _mutex.unlock(); } private: Mutex _mutex; }; //Thread.hpp #pragma once #include <iostream> #include <string> #include <cstdlib> #include <pthread.h> class Thread { public: typedef enum { NEW = 0, RUNNING, EXITED } ThreadStatus;//表示线程的状态 typedef void (*func_t)(void *); public: Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args) { char name[128]; snprintf(name, sizeof(name), "thread-%d", num); _name = name; } int status() { return _status; } std::string threadname() { return _name; } pthread_t threadid() { if (_status == RUNNING) return _tid; else { return 0; } } // 类的成员函数,具有默认参数this,需要static,这时没有所谓的this指针,就可以和pthread_create的第三个参数匹配 // 但是会有新的问题:static成员函数,因为没有this指针导致无法直接访问类属性和其他成员函数 static void *runHelper(void *args) { Thread *ts = (Thread*)args; // args接受的时this指针,就拿到了当前对象 // _func(_args); (*ts)(); return nullptr; } void operator ()() //仿函数 { if(_func != nullptr) _func(_args); } void run() { int n = pthread_create(&_tid, nullptr, runHelper, this);//传递this指针是为了让函数runHelper可以拿到this对象 if(n != 0) exit(1); _status = RUNNING; } void join() { int n = pthread_join(_tid, nullptr); if( n != 0) { std::cerr << "main thread join thread " << _name << " error" << std::endl; return; } _status = EXITED; } ~Thread() { } private: pthread_t _tid;//线程id std::string _name;//线程名字 func_t _func; // 线程未来要执行的回调 void *_args;// ThreadStatus _status; }; //main.cpp #include <iostream> #include <string> #include <unistd.h> #include "Thread.hpp" #include "lockGuard.hpp" using namespace std; // // 临界资源 int tickets = 1000; // 全局变量,共享对象 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 这是我在外边定义的锁 void threadRoutine(void *args) { std::string message = static_cast<const char *>(args); while (true) { { LockGuard lockguard(&mutex); // RAII 风格的锁,我们用lockGaurd类定义了一个临时对象,临时对象对传入进来的参数进行了临时的加锁操作,当while循环一次之后,也就是代码块结束之后,临时对象会自动被释放就会自动调用临时对象的析构函数,从而实现解锁。 if (tickets > 0) { usleep(2000); cout << message << " get a ticket: " << tickets-- << endl; // 临界区 } else { break; } } } } int main() { Thread t1(1, threadRoutine, (void *)"hellobit1"); Thread t2(2, threadRoutine, (void *)"hellobit2"); Thread t3(3, threadRoutine, (void *)"hellobit3"); Thread t4(4, threadRoutine, (void *)"hellobit4"); t1.run(); t2.run(); t3.run(); t4.run(); t1.join(); t2.join(); t3.join(); t4.join(); return 0; }
adfasdf asdfasdf { LockGuard lockguard(&mutex); // RAII 风格的锁 asdf asdf } asdfas dfasd fasdfasdf
不可重入函数要求函数实现中使用的都是函数栈上面得变量,不要使用全局变量或者静态变量。加锁和解锁可以跨线程进行,当让大部分情况是哪个线程加锁就要由哪个线程进行解锁,此时锁可以控制另一个线程得行为。
条件变量
其出现是为了避免某一个线程由于临界资源条件不满足不断地申请释放锁也就是向临界资源做检测,其它线程从而无法获取锁,导致资源的浪费问题。条件变量允许多线程在cond中队列式等待(就是一种顺序),从而实现线程同步。
const int num = 5; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//采用全局的方式定义条件变量,省去了初始化和释放得过程,也可以定义局部的。 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; void *active(void *args) { string name = static_cast<const char *>(args); while (true) { pthread_mutex_lock(&mutex); pthread_cond_wait(&cond, &mutex); // 这里就判断条件是否满足,直接默认不满足,也就是线程进入到该区间默认条件不满足,直接将该线程投递到条件变量让它去等。调用的时候,会自动释放锁, 总不能抱着锁去等待吧,否则其他线程想进来也进不来。TODO cout << name << " 活动" << endl; pthread_mutex_unlock(&mutex); } } int main() { pthread_t tids[num]; for (int i = 0; i < num; i++) { char *name = new char[32]; snprintf(name, 32, "thread-%d", i + 1); pthread_create(tids + i, nullptr, active, name); } sleep(3); //此时各个线程都在条件变量cond下等待,主线程不唤醒子线程,子线程会一直处于休眠状态。 while (true) { cout << "main thread wakeup thread..." << endl; pthread_cond_signal(&cond);//被唤醒之后该线程会从当时进入睡眠状态的代码处向后运行。被唤醒的线程按照再条件变量队列当中的顺序一次被唤醒 //pthread_cond_broadcast(&cond); sleep(1); } for (int i = 0; i < num; i++) { pthread_join(tids[i], nullptr); } }
条件变量允许多线程在cond中进行队列式等待(这就是一种顺序)
生产者消费者模型
多生产者和多消费者的高效性并不是体现在向队列中那东西或者放入东西,而是消费者生产数据的时候会有多线程过来拿数据或者消费者处理数据的时候多线程来放入数据。
阻塞队列
单生产者和单消费者向队列中放入整型
//blockQueue.hpp #pragma once #include <iostream> #include <queue> #include <pthread.h> const int gcap = 5; // 不要认为,阻塞队列只能放整数字符串之类的 // 也可以放对象 template <class T> class BlockQueue { public: BlockQueue(const int cap = gcap):_cap(cap)//初始化阻塞队列的容量 { pthread_mutex_init(&_mutex, nullptr);//对所进行初始化 pthread_cond_init(&_consumerCond, nullptr);//对条件变量进行初始化 pthread_cond_init(&_productorCond, nullptr); } bool isFull(){ return _q.size() == _cap; } bool isEmpty() { return _q.empty(); } void push(const T &in) { //向队列中push数据的第一件事是向整个队列中进行加锁工作,防止我打算生产的时候有人正在消费 pthread_mutex_lock(&_mutex); // 细节1:一定要保证,在任何时候,都是符合条件,才进行生产 while(isFull()) // 1. 我们只能在临界区内部,判断临界资源是否就绪!因为临界资源是被多线程共享的,所以判断临界资源(本身也是对临界资源的一种访问)是否就绪就需要在临界区中(也就是加锁和解锁之间)进行,注定了我们在当前一定是持有锁的!这里不适用if判断是害怕发生这种情况:有一个消费线程和5个生产线程,如果消费线程消费一个之后采用broad将所有的生产线程同时唤醒,这时候就需要往队列中push5次数据,但是队列中可能只剩下一个位置没有被填充,这就会导致线程错误。 { // 2. 要让线程进行休眠等待,不能持有锁等待! // 3. 注定了,pthread_cond_wait要有锁的释放的能力! pthread_cond_wait(&_productorCond, &_mutex); // 当前是满的,就不能够再生产了,于是再自己的条件变量下进行休眠,wait函数一执行就会释放锁。我休眠了(可以理解为线程被切换),我醒来的时候,在哪里往后执行呢? // 4. 当线程醒来的时候,注定了继续从临界区内部继续运行!因为我是在临界区被切走的! // 5. 注定了当线程被唤醒的时候,继续在pthread_cond_wait函数处向后运行,又要重新申请锁,申请成功才会彻底返回 } // 队列没有满,就要让他进行生产 _q.push(in);//执行完该行代码可以保证队列为非空 // 加策略 //if(_q.size() >= _cap/2) pthread_cond_signal(&_consumerCond);//生产者唤醒消费者,但是此时消费者是否休眠不得而知。 pthread_mutex_unlock(&_mutex); // pthread_cond_signal(&_consumerCond); } void pop(T *out) { pthread_mutex_lock(&_mutex); while(isEmpty()) { pthread_cond_wait(&_consumerCond, &_mutex); } *out = _q.front(); _q.pop();//执行完该行代码至少可以保证队列没有满 // 加策略 pthread_cond_signal(&_productorCond);//消费者唤醒生产者,唤醒行为可以放在释放锁前面或者后面都可以 pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex);//销毁锁 pthread_cond_destroy(&_consumerCond);//销毁条件变量 pthread_cond_destroy(&_productorCond); } private: std::queue<T> _q; int _cap;//队列的容量上限 // 为什么我们的这份代码,只用一把锁呢,根本原因在于, // 我们生产和消费访问的是同一个queue&&queue被当做整体使用! pthread_mutex_t _mutex; //不论是生产者还是消费者,两个访问的都是同一个队列,所以要避免两个访问同一份资源,为了防止有人向队列放数据的同时又有人像队列中拿数据,在这里定义一个锁,而且只能有一把,因为访问的是消费者和生产者访问的是同一份临界资源,所以定义一把锁就可以了。 pthread_cond_t _consumerCond; // 消费者对应的条件变量,空,wait,防止消费者加锁发现队列为空释放锁然后继续加锁···,这种不断地循环造成的浪费。 pthread_cond_t _productorCond; // 生产者对应的条件变量,满,wait }; //main.cc void *consumer(void *args) { BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args); while (true) { sleep(1);//休眠1s导致我们执行代码的时候队列一下子就被生产满了吗,然后就是消费一个生产一个。 int data = 0; // 1. 将数据从blockqueue中获取 -- 获取到了数据 bq->pop(&data); // 2. 结合某种业务逻辑,处理数据! -- TODO std::cout << " consumer data: " << data << std::endl; } } void *productor(void *args) { BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args); while (true) { // sleep(1); // 1. 先通过某种渠道获取数据 int data = rand() % 10 + 1; // 2. 将数据推送到blockqueue -- 完成生产过程 bq->push(data); std::cout << " productor Task: " << data << std::endl; } } int main() { srand((uint64_t)time(nullptr) ^ getpid()); BlockQueue<int> *bq = new BlockQueue<int>(); // 单生产和单消费(只需要维护一种关系也就是生产者和消费者之间的关系) -> 多生产和多消费(维护三种关系) pthread_t c, p; pthread_create(&c, nullptr, consumer, bq); pthread_create(&p, nullptr, productor, bq); pthread_join(c, nullptr); pthread_join(p, nullptr); delete bq; return 0; }
多生产多消费向阻塞队列中放入任务
//blockQueue.hpp #pragma once #include <iostream> #include <queue> #include <pthread.h> const int gcap = 5; // 不要认为,阻塞队列只能放整数字符串之类的 // 也可以放对象 template <class T> class BlockQueue { public: BlockQueue(const int cap = gcap):_cap(cap)//初始化阻塞队列的容量 { pthread_mutex_init(&_mutex, nullptr);//对所进行初始化 pthread_cond_init(&_consumerCond, nullptr);//对条件变量进行初始化 pthread_cond_init(&_productorCond, nullptr); } bool isFull(){ return _q.size() == _cap; } bool isEmpty() { return _q.empty(); } void push(const T &in) { //向队列中push数据的第一件事是向整个队列中进行加锁工作,防止我打算生产的时候有人正在消费 pthread_mutex_lock(&_mutex); // 细节1:一定要保证,在任何时候,都是符合条件,才进行生产 if(isFull()) // 1. 我们只能在临界区内部,判断临界资源是否就绪!因为临界资源是被多线程共享的,所以判断临界资源(本身也是对临界资源的一种访问)是否就绪就需要在临界区中(也就是加锁和解锁之间)进行,注定了我们在当前一定是持有锁的! { // 2. 要让线程进行休眠等待,不能持有锁等待! // 3. 注定了,pthread_cond_wait要有锁的释放的能力! pthread_cond_wait(&_productorCond, &_mutex); // 当前是满的,就不能够再生产了,于是再自己的条件变量下进行休眠,wait函数一执行就会释放锁。我休眠了(可以理解为线程被切换),我醒来的时候,在哪里往后执行呢? // 4. 当线程醒来的时候,注定了继续从临界区内部继续运行!因为我是在临界区被切走的! // 5. 注定了当线程被唤醒的时候,继续在pthread_cond_wait函数处向后运行,又要重新申请锁,申请成功才会彻底返回 } // 队列没有满,就要让他进行生产 _q.push(in);//执行完该行代码可以保证队列为非空 // 加策略 //if(_q.size() >= _cap/2) pthread_cond_signal(&_consumerCond);//生产者唤醒消费者,但是此时消费者是否休眠不得而知。 pthread_mutex_unlock(&_mutex); // pthread_cond_signal(&_consumerCond); } void pop(T *out) { pthread_mutex_lock(&_mutex); if(isEmpty()) { pthread_cond_wait(&_consumerCond, &_mutex); } *out = _q.front(); _q.pop();//执行完该行代码至少可以保证队列没有满 // 加策略 pthread_cond_signal(&_productorCond);//消费者唤醒生产者,唤醒行为可以放在释放锁前面或者后面都可以 pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex);//销毁锁 pthread_cond_destroy(&_consumerCond);//销毁条件变量 pthread_cond_destroy(&_productorCond); } private: std::queue<T> _q; int _cap;//队列的容量上限 // 为什么我们的这份代码,只用一把锁呢,根本原因在于, // 我们生产和消费访问的是同一个queue&&queue被当做整体使用! pthread_mutex_t _mutex; //不论是生产者还是消费者,两个访问的都是同一个队列,所以要避免两个访问同一份资源,为了防止有人向队列放数据的同时又有人像队列中拿数据,在这里定义一个锁。 pthread_cond_t _consumerCond; // 消费者对应的条件变量,空,wait,防止消费者加锁发现队列为空释放锁然后继续加锁···,这种不断地循环造成的浪费。 pthread_cond_t _productorCond; // 生产者对应的条件变量,满,wait }; //task.hpp #pragma once #include <iostream> #include <string> class Task { public: Task() { } Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0) { } void operator()() { switch (_op) { case '+': _result = _x + _y; break; case '-': _result = _x - _y; break; case '*': _result = _x * _y; break; case '/': { if (_y == 0) _exitCode = -1; else _result = _x / _y; } break; case '%': { if (_y == 0) _exitCode = -2; else _result = _x % _y; } break; default: break; } } std::string formatArg() { return std::to_string(_x) + _op + std::to_string(_y) + "="; } std::string formatRes() { return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")"; } ~Task() { } private: int _x; int _y; char _op; int _result; int _exitCode; }; //main.cc #include "blockQueue.hpp" #include "task.hpp" #include <pthread.h> #include <unistd.h> #include <ctime> void *consumer(void *args) { BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args); while (true) { sleep(1);//休眠1s导致我们执行代码的时候队列一下子就被生产满了吗,然后就是消费一个生产一个。 int data = 0; // 1. 将数据从blockqueue中获取 -- 获取到了数据 bq->pop(&data); // 2. 结合某种业务逻辑,处理数据! -- TODO std::cout << " consumer data: " << data << std::endl; } } void *productor(void *args) { BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args); while (true) { // sleep(1); // 1. 先通过某种渠道获取数据 int data = rand() % 10 + 1; // 2. 将数据推送到blockqueue -- 完成生产过程 bq->push(data); std::cout << " productor Task: " << data << std::endl; } } int main() { srand((uint64_t)time(nullptr) ^ getpid()); BlockQueue<int> *bq = new BlockQueue<int>(); // 单生产和单消费(只需要维护一种关系也就是生产者和消费者之间的关系) -> 多生产和多消费(维护三种关系) pthread_t c, p; pthread_create(&c, nullptr, consumer, bq); pthread_create(&p, nullptr, productor, bq); pthread_join(c, nullptr); pthread_join(p, nullptr); delete bq; return 0; }
//makefile cp:main.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f cp
信号量
本质就是一个计数器,用来衡量临界资源中的资源数目。信号量需要进行PV操作,p操作就是--操作,v操作就是++操作,而pv操作一定要是原子的。当临界资源中的资源数目为1的时候,叫做二元信号量,也就是互斥锁。如果临界资源中的资源数目大于或者等于2,就可以定义一个多元信号量,每一个线程在访问对应的资源的时候,先申请信号量,申请成功,表示该线程允许使用该资源,如果申请不成功,就表示目前无法使用该资源。信号量的工作机制:信号量机制类似于我们看电影买票,是一种资源的预定机制。 信号量已经是资源的计数器了,申请信号量成功,本身就是资源可用,申请信号量失败本身表明资源不可用--本质就是把判断转换成为信号量的申请行为。
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);//模拟处理一个任务所需要的时间
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
static const int N = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
public:
RingQueue(int num = N) : _ring(num), _cap(num)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, num);
_c_step = _p_step = 0;//生产者和消费者初始化的时候下标都是0
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
// 生产
void push(const T &in)
{
// 1. 可以不用在临界区内部做判断,就可以知道临界资源的使用情况
// 2. 什么时候用锁,什么时候用sem?你对应的临界资源,是否被整体使用!
P(_space_sem); // P();在生产之前先要进行p操作,现在使用信号量不需要判断是否还有临界资源,因为信号量是一把计数器,本质工作对资源的就绪情况从临界区内转移到了临界区外面。在之前互斥锁那边判断资源是否就绪首先要检查资源,所以得现持有锁,然后再访问资源。而信号量本身就是描述临界资源数量的,所以信号量就省去了加锁进入临界资源判断临界资源数量的工作了。
Lock(_p_mutex); //? 1
// 一定有对应的空间资源给我!不用做判断,是哪一个呢?
_ring[_p_step++] = in;//生产数据
_p_step %= _cap;//维护环状队列的环状特点
Unlock(_p_mutex);
V(_data_sem);//对数据进行v操作,也就是数据量加加
}
// 消费
void pop(T *out)
{
P(_data_sem);
Lock(_c_mutex); //?
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);//数据信号量减减就意味着空间资源多了一个
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ring;
int _cap; // 环形队列容器大小
sem_t _data_sem; // 只有消费者关心
sem_t _space_sem; // 只有生产者关心
int _c_step; // 消费位置
int _p_step; // 生产位置,需要两个位置下标是因为生产者和消费者访问的位置是不一样的。
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
Main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <pthread.h>
#include <memory>
#include <sys/types.h>
#include <unistd.h>
#include <cstring>
using namespace std;
const char *ops = "+-*/%";
void *consumerRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatRes() << endl;
}
}
void *productorRoutine(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % strlen(ops)];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArg() << endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
// 单生产单消费
// pthread_t c, p;
// pthread_create(&c, nullptr, consumerRoutine, rq);
// pthread_create(&p, nullptr, productorRoutine, rq);
// pthread_join(c, nullptr);
// pthread_join(p, nullptr);
// 多生产,多消费,该如何更改代码呢?done
// 意义在哪里呢?意义绝对不在从缓冲区冲放入和拿去,意义在于,放前并发构建Task,获取后多线程可以并发处理task,因为这些操作没有加锁!
pthread_t c[3], p[2];
for (int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, consumerRoutine, rq);
for (int i = 0; i < 2; i++)
pthread_create(p + i, nullptr, productorRoutine, rq);
for (int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for (int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
delete rq;
return 0;
}
makefile
ringqueue:Main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f ringqueue