文章目录
- 信号量和条件变量实践
- 1.实验基本环境
- (1).基本系统环境
- 2.信号量
- (1).如何使用信号量?
- (2).课上的例子
- (3).打印合法括号序列
- (4).打印很多条鱼
- 3.条件变量
- (1).为什么选择条件变量?
- (2).还是课上的例子
- (3).还是合法括号序列
- (4).还是打印很多鱼
- 总结
信号量和条件变量实践
1.实验基本环境
(1).基本系统环境
这次实验我采用了和上一次相同的运行着Ubuntu-22.04的云服务器完成,它的基本信息如下:
上周实验之后实际上我也在macOS上尝试了类似的工作,但是发现macOS上的pthread.h其中竟然是不包含pthread_spinlock_t,也就是内置自旋锁变量的,虽然用mutex能做到相同的效果,所以这可能也说明,linux上的pthread会更加符合我们平常使用的习惯
2.信号量
(1).如何使用信号量?
使用semaphore.h头文件就可以了,首先信号量对象的类型是sem_t,之后sem_wait函数作为P操作,sem_post函数作为V操作,只要这样,我们就可以完成信号量的所有操作了
(2).课上的例子
课上的例子是对于以下的四个节点,假设每个节点对应打印自己的字母,因此在下图的情况下:ABCD和ACBD都是合法的序列,我们需要靠四个线程的同步来打印合法序列
在这里设计五个信号量: B A , B C , D B , D C , D A BA,BC,DB,DC,DA BA,BC,DB,DC,DA,增加信号量 D A DA DA是因为这里希望可以循环打印出我们需要的结果,因此我们可以很轻松地设计出如下的代码:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define NUMS 4
sem_t ba, ca, db, dc, ad;
void init()
{
sem_init(&ba, 0, 0);
sem_init(&ca, 0, 0);
sem_init(&db, 0, 0);
sem_init(&dc, 0, 0);
sem_init(&ad, 0, 1);
}
void* a()
{
while (1) {
sem_wait(&ad);
printf("a");
sem_post(&ba);
sem_post(&ca);
}
pthread_exit(NULL);
}
void* b()
{
while (1) {
sem_wait(&ba);
printf("b");
sem_post(&db);
}
pthread_exit(NULL);
}
void* c()
{
while (1) {
sem_wait(&ca);
printf("c");
sem_post(&dc);
}
pthread_exit(NULL);
}
void* d()
{
while (1) {
sem_wait(&db);
sem_wait(&dc);
printf("d_");
sem_post(&ad);
}
pthread_exit(NULL);
}
int main()
{
init();
pthread_t thr[NUMS];
pthread_create(&thr[0], NULL, a, NULL);
pthread_create(&thr[1], NULL, b, NULL);
pthread_create(&thr[2], NULL, c, NULL);
pthread_create(&thr[3], NULL, d, NULL);
for (int i = 0; i < NUMS; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
效果很明显是对的,所有的V操作都可以任意交换,但是P操作不是,不过在这里d线程不会受到影响,因此对于db和dc的两个P操作是可以任意交换的,它们的顺序不会影响执行,不会产生死锁,不过这里偷懒只用了四个线程,实际上应该再用一个打印下划线的线程完成下划线的打印,因此我又写了这段代码:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define NUMS 5
sem_t ba, ca, db, dc, ed, ae;
void init()
{
sem_init(&ba, 0, 0);
sem_init(&ca, 0, 0);
sem_init(&db, 0, 0);
sem_init(&dc, 0, 0);
sem_init(&ed, 0, 0);
sem_init(&ae, 0, 1);
}
void* a()
{
while (1) {
sem_wait(&ae);
printf("a");
sem_post(&ba);
sem_post(&ca);
}
pthread_exit(NULL);
}
void* b()
{
while (1) {
sem_wait(&ba);
printf("b");
sem_post(&db);
}
pthread_exit(NULL);
}
void* c()
{
while (1) {
sem_wait(&ca);
printf("c");
sem_post(&dc);
}
pthread_exit(NULL);
}
void* d()
{
while (1) {
sem_wait(&db);
sem_wait(&dc);
printf("d");
sem_post(&ed);
}
pthread_exit(NULL);
}
void* underline()
{
while (1) {
sem_wait(&ed);
printf("_");
sem_post(&ae);
}
pthread_exit(NULL);
}
int main()
{
init();
pthread_t thr[NUMS];
pthread_create(&thr[0], NULL, a, NULL);
pthread_create(&thr[1], NULL, b, NULL);
pthread_create(&thr[2], NULL, c, NULL);
pthread_create(&thr[3], NULL, d, NULL);
pthread_create(&thr[4], NULL, underline, NULL);
for (int i = 0; i < NUMS; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
因为整个计算图增加了两条边,因此我又增加了两个信号量,它也正常地完成了工作,因此对于一个简单的计算图同步问题,我们可以使用画出计算图,然后根据边来设计对应的信号量
(3).打印合法括号序列
这其实是一个典型的生产者—消费者问题,打印"(“的作为生产者,当存在”(“的时候可以打印”)“,则打印”)“的为消费者,同时还要保证”(“的数量不能超过一个定值,比如3,4这样,限制深度,这里要求的是两个死循环线程分别打印”(“和”)",需要使用信号量的方式完成两个线程的同步,从而使得不出现非法序列以及不能使得括号嵌套深度超过设定的深度。
对于这个问题,因为信号量本身并不是一个仅能为0/1的值,因此我们可以利用信号量的这一特点,设计以下的代码:
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#define NUMS 8
sem_t fill, empty;
void init(int k)
{
sem_init(&empty, 0, k);
sem_init(&fill, 0, 0);
}
void* Tproducer()
{
while (1) {
sem_wait(&empty);
printf("(");
sem_post(&fill);
}
pthread_exit(NULL);
}
void* Tconsumer()
{
while (1) {
sem_wait(&fill);
printf(")");
sem_post(&empty);
}
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
assert(argc == 2);
init(atoi(argv[1]));
pthread_t thr[NUMS * 2 + 1];
for (int i = 0; i < NUMS; i+=2) {
pthread_create(&thr[i], NULL, Tproducer, NULL);
pthread_create(&thr[i+1], NULL, Tconsumer, NULL);
}
for (int i = 0; i < NUMS * 2; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
这个程序在运行时会接收一个额外的参数,这个参数作为括号嵌套的最大深度,如果是4,则代表不能超过4层括号嵌套,在设计的时候采用了两个信号量,本来我还担心说这个信号量可能需要比较来完成深度检验,但是发现如果采用fill+empty=MAX_DEPTH这样的两个信号量,一定可以保证嵌套深度不可能大于MAX_DEPTH,这也是我认为这个写法最巧妙的地方,所以它也的确可以完成任务:
例如对于允许8层嵌套的这次执行,可以发现最深的嵌套次数的确没有超过8次,当然我们运行再久也不能说明这个程序就是对的,但是从比较长期的运行来看,它的正确性应当是可以保证的。
(4).打印很多条鱼
NJU的jyy老师上课时有这么个例子,有这样三个线程,分别死循环打印"<“,”>“和”_“,我们需要做的就是让三个线程同步,从而打印出”<><_“或是”><>_“这样类似鱼的图形,向左还是向右无所谓,但是不能出现非法情况,例如”<<>><_"并不是一条合法的鱼。
一条鱼长大了,就不再是合法的鱼了。 —— 我自己说的
所以这个问题如果要用信号量实现,我们应该设计出它的一个状态转移图:
麻烦的是,如果单从状态来说,按照每一条边一个信号量的原则,这个实现起来相当复杂(我们需要9个信号量),但是仔细观察可以发现,<和>一定是交替出现的,并且出现三次后一定会打印_,因此或许我们可以只用三个信号量来完成这个操作:
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
sem_t fill, empty, fin;
int f_val;
void init()
{
sem_init(&empty, 0, 1);
sem_init(&fill, 0, 0);
sem_init(&fin, 0, 3);
}
void* Tleft()
{
while (1) {
sem_wait(&empty);
sem_wait(&fin);
printf("<");
sem_post(&fill);
}
pthread_exit(NULL);
}
void* Tright()
{
while (1) {
sem_wait(&fill);
sem_wait(&fin);
printf(">");
sem_post(&empty);
}
pthread_exit(NULL);
}
void* final()
{
while (1) {
sem_getvalue(&fin, &f_val);
if (f_val == 0) {
sem_init(&empty, 0, 0);
sem_init(&fill, 0, 0);
printf("_");
int ran = rand() % 2;
switch (ran) {
case 0:
sem_post(&empty);
break;
case 1:
sem_post(&fill);
break;
}
sem_init(&fin, 0, 3);
}
}
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
init();
pthread_t thr[3];
pthread_create(&thr[0], NULL, Tleft, NULL);
pthread_create(&thr[1], NULL, Tright, NULL);
pthread_create(&thr[2], NULL, final, NULL);
for (int i = 0; i < 3; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
非常好的尝试,完全跑不起来,一定存在某个地方死锁了,所以这个时候我们首先尝试最基本的方式:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
sem_t a, b, c, d, e, f, g, h, FIN;
int ran;
void init()
{
sem_init(&a, 0, 0);
sem_init(&e, 0, 0);
sem_init(&b, 0, 0);
sem_init(&c, 0, 0);
sem_init(&d, 0, 0);
sem_init(&f, 0, 0);
sem_init(&g, 0, 0);
sem_init(&h, 0, 0);
sem_init(&FIN, 0, 1);
}
void* Ta()
{
while (1) {
sem_wait(&a);
printf("<");
sem_post(&b);
}
pthread_exit(NULL);
}
void* Tb()
{
while (1) {
sem_wait(&b);
printf(">");
sem_post(&c);
}
pthread_exit(NULL);
}
void* Tc()
{
while (1) {
sem_wait(&c);
printf("<");
sem_post(&d);
}
pthread_exit(NULL);
}
void* Td()
{
sem_wait(&d);
printf("_");
sem_post(&FIN);
pthread_exit(NULL);
}
void* Te()
{
while (1) {
sem_wait(&e);
printf(">");
sem_post(&f);
}
pthread_exit(NULL);
}
void* Tf()
{
while (1) {
sem_wait(&f);
printf("<");
sem_post(&g);
}
pthread_exit(NULL);
}
void* Tg()
{
while (1) {
sem_wait(&g);
printf(">");
sem_post(&h);
}
pthread_exit(NULL);
}
void* Th()
{
while (1) {
sem_wait(&h);
printf("_");
sem_post(&FIN);
}
pthread_exit(NULL);
}
void* Ti()
{
while (1) {
sem_wait(&FIN);
ran = rand() % 2;
if (ran) {
sem_post(&a);
}
else {
sem_post(&e);
}
}
pthread_exit(NULL);
}
int main()
{
init();
pthread_t thr[10];
pthread_create(&thr[0], NULL, Ta, NULL);
pthread_create(&thr[1], NULL, Tb, NULL);
pthread_create(&thr[2], NULL, Tc, NULL);
pthread_create(&thr[3], NULL, Td, NULL);
pthread_create(&thr[4], NULL, Te, NULL);
pthread_create(&thr[5], NULL, Tf, NULL);
pthread_create(&thr[6], NULL, Tg, NULL);
pthread_create(&thr[7], NULL, Th, NULL);
pthread_create(&thr[8], NULL, Ti, NULL);
for (int i = 0; i < 9; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
这里的尝试就比较简单,我们根据前面的图抽象出了一个生产者—消费者模型完成了所有信号量的设计,但是这个尝试依旧没有成功,应该还是有什么地方被锁住了,但是我不太清楚,因此我又回到上一段代码开始进行尝试,经过长时间的GDB调试等,最后我确定问题应该出在这里:
void* final()
{
while (1) {
sem_getvalue(&fin, &f_val);
if (f_val == 0) {
sem_init(&empty, 0, 0);
sem_init(&fill, 0, 0);
printf("_");
int ran = rand() % 2;
switch (ran) {
case 0:
sem_post(&empty);
break;
case 1:
sem_post(&fill);
break;
}
sem_init(&fin, 0, 3);
}
}
pthread_exit(NULL);
}
我当时的预期是,从信号量fin处取得值之后再对该值进行判断,如果这个信号量对应的计数已经清零,则此时应该将empty、fill重置,随机选择,最后再将fin置为3开始下一轮打印。但这段代码最大的问题在于:它开始操作的阶段是无锁的,虽然我还没有完全分析出在这个位置可能会出什么问题,但我感觉这样的写法和纯粹的P-V操作是不匹配的
因此我重新设计了对于打印3次后中止然后开始打印_这个需求的解决方案,对于这个点,可以设计一个临界变量cnt,它的作用是记录当前还剩的可打印次数,因为是简单的临界区变量,所以我将fin信号量换成了mutex,作为对于cnt变量的互斥锁,所以一开始我写出的代码是这样的:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
sem_t fill, empty, mutex;
int cnt;
void init()
{
sem_init(&empty, 0, 0);
sem_init(&fill, 0, 0);
sem_init(&mutex, 0, 1);
cnt = 0;
}
void* Tleft()
{
while (1) {
sem_wait(&empty);
sem_wait(&mutex);
cnt--;
printf("<");
sem_post(&mutex);
}
pthread_exit(NULL);
}
void* Tright()
{
while (1) {
sem_wait(&fill);
sem_wait(&mutex);
cnt--;
printf(">");
sem_post(&mutex);
}
pthread_exit(NULL);
}
void* final()
{
while (1) {
sem_wait(&mutex);
cnt = 3;
sem_post(&mutex);
printf("_");
int ran = rand() % 2;
switch (ran) {
case 0:
sem_post(&empty);
break;
case 1:
sem_post(&fill);
break;
}
}
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
init();
pthread_t thr[3];
pthread_create(&thr[0], NULL, Tleft, NULL);
pthread_create(&thr[1], NULL, Tright, NULL);
pthread_create(&thr[2], NULL, final, NULL);
for (int i = 0; i < 3; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
看起来好像没问题了,但是这段代码实际上完全没有同步:
在这里,我没有对两个线程得到了cnt之后做的事情做任何的限制,这也导致了cnt看似有效,实则完全无效的问题,因此我又把代码改成这样:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
sem_t fill, empty, mutex;
int cnt;
void init()
{
sem_init(&empty, 0, 0);
sem_init(&fill, 0, 0);
sem_init(&mutex, 0, 1);
cnt = 0;
}
void* Tleft()
{
while (1) {
sem_wait(&empty);
sem_wait(&mutex);
if (cnt) {
printf("<");
cnt--;
}
sem_post(&mutex);
}
pthread_exit(NULL);
}
void* Tright()
{
while (1) {
sem_wait(&fill);
sem_wait(&mutex);
if (cnt) {
printf("<");
cnt--;
}
sem_post(&mutex);
}
pthread_exit(NULL);
}
void* final()
{
while (1) {
sem_wait(&mutex);
if (cnt == 0) {
cnt = 3;
sem_init(&empty, 0, 0);
sem_init(&fill, 0, 0);
printf("_");
int ran = rand() % 2;
switch (ran) {
case 0:
sem_post(&empty);
break;
case 1:
sem_post(&fill);
break;
}
}
sem_post(&mutex);
}
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
init();
pthread_t thr[3];
pthread_create(&thr[0], NULL, Tleft, NULL);
pthread_create(&thr[1], NULL, Tright, NULL);
pthread_create(&thr[2], NULL, final, NULL);
for (int i = 0; i < 3; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
这次的代码终于对final进行了限制,但是运行起来会直接导致死锁,实际上这是我对sem_init的一个误解,因为最关键的事情是我需要让empty和fill这两个信号量在final中都被清零,但是对于一个已初始化的信号量使用sem_init是未定义行为,因此这样的做法是错误的,所以我得让这两个线程在一共打印3次之后就一定会把empty和fill都清零,所以就有了最后的代码:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
sem_t fill, empty, mutex;
int cnt;
void init()
{
sem_init(&empty, 0, 0);
sem_init(&fill, 0, 0);
sem_init(&mutex, 0, 1);
cnt = 0;
}
void* Tleft()
{
int conti;
while (1) {
sem_wait(&empty);
sem_wait(&mutex);
if (cnt) {
conti = cnt;
cnt--;
}
else {
conti = 0;
}
if (conti == 0) {
sem_post(&mutex);
continue;
}
printf("<");
sem_post(&mutex);
if (conti > 1) sem_post(&fill);
}
pthread_exit(NULL);
}
void* Tright()
{
int conti;
while (1) {
sem_wait(&fill);
sem_wait(&mutex);
if (cnt) {
conti = cnt;
cnt--;
}
else {
conti = 0;
}
if (conti == 0) {
sem_post(&mutex);
continue;
}
printf(">");
sem_post(&mutex);
if (conti > 1) sem_post(&empty);
}
pthread_exit(NULL);
}
void* final()
{
int conti;
while (1) {
sem_wait(&mutex);
if (cnt == 0) {
conti = 1;
cnt = 3;
}
else {
conti = 0;
}
sem_post(&mutex);
if (conti == 0) continue;
printf("_");
int ran = rand() % 2;
switch (ran) {
case 0:
sem_post(&empty);
break;
case 1:
sem_post(&fill);
break;
}
}
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
init();
pthread_t thr[3];
pthread_create(&thr[0], NULL, Tleft, NULL);
pthread_create(&thr[1], NULL, Tright, NULL);
pthread_create(&thr[2], NULL, final, NULL);
for (int i = 0; i < 3; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
经过长期运行,没有观察到非法的输出,到这里应该也就算是成功了,这里的设计相比之前最大的区别实际上是对于每个线程都会有一个局部的conti变量,这个变量能够保证在需要跳过循环的时候直接中断循环跳过,这导致我们可以在cnt清零的时候让线程主动放弃自己曾经拿到的empty或者fill信号量,最后在cnt清零进入final的时候,一定可以保证empty和fill信号量是0,所以这时候再进行赋值就不会出问题了。
不得不说,写信号量是真难啊,前面两个简单的计算图问题只用了大概20分钟就解决了,但是最后这个问题用信号量之后我de了两个小时的bug才终于大概是解决了问题。
3.条件变量
(1).为什么选择条件变量?
信号量本身是一个比较复杂的事情,对于不同的问题需要设计不同的信号量来解决问题,而条件变量比较简单,只要能够确定继续执行的条件,一定能比较方便快速地写出对应的代码,因此在对于信号量的实验之后,我打算再用条件变量来完成相同的操作。
(2).还是课上的例子
我们已经有了那个例子的状态转移图,理论上讲,按照条件变量的实现思路,我们只需要将状态转移是否合法作为判断下一步的条件即可,因此我们可以用下面的结构体来实现合法状态转移的表示:
enum { A = 1, B, C, D, E, F };
struct rule
{
int from, ch, to;
};
struct rule rules[] = {
{ A, 'A', B },
{ B, 'B', C },
{ C, 'C', D },
{ A, 'A', E },
{ E, 'C', F },
{ F, 'B', D },
{ D, 'D', A }
};
为什么这里变成六个状态了呢?因为我发现条件变量不像信号量,信号量构成的条件可以很轻松地实现抢占先机这件事,因此我只需要信号量有了之后就可以直接去让对应线程抢占打印时机就好,但是条件变量需要满足条件,因此我可能就需要设计更多的状态以描述完整的过程:
在有了这个状态机之后,我们就可以很轻松地写出代码了:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <string.h>
enum { A = 1, B, C, D, E, F };
struct rule
{
int from, ch, to;
};
struct rule rules[] = {
{ A, 'A', B },
{ B, 'B', C },
{ C, 'C', D },
{ A, 'A', E },
{ E, 'C', F },
{ F, 'B', D },
{ D, 'D', A }
};
int current = A, quota = 1;
pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
int next(char ch)
{
for (int i = 0; i < 7; i++) {
struct rule* rule = &rules[i];
if (rule->from == current && rule->ch == ch) {
return rule->to;
}
}
return 0;
}
int can_print(char ch)
{
return next(ch) != 0 && quota > 0;
}
void seq_before(char ch)
{
pthread_mutex_lock(&lk);
while (!can_print(ch)) {
pthread_cond_wait(&cv, &lk);
}
quota--;
pthread_mutex_unlock(&lk);
}
void seq_after(char ch)
{
pthread_mutex_lock(&lk);
quota++;
current = next(ch);
assert(current);
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&lk);
}
const char roles[] = ".AAABBBBBBCCCCCDDD";
void* seq_thread(void* id)
{
char role = roles[*(int*)id];
while (1) {
seq_before(role);
putchar(role);
seq_after(role);
}
pthread_exit(NULL);
}
int main()
{
pthread_t thr[50];
int len = strlen(roles);
int n[50];
for (int i = 1; i <= len; i++) n[i] = i;
for (int i = 1; i <= len; i++) {
pthread_create(&thr[i], NULL, seq_thread, (void*)&n[i]);
}
for (int i = 1; i <= len; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
roles数组里存的就是ABCD对应线程的个数,这样的话即便换成别的字符,也能照样打印出来
结果是对的,不过可以发现结果貌似只有ABCD这一种,这个结果其实没有问题,因为我们在选择下一个状态进行激活的时候一直是按照顺序来的,如果我们把AE放到AB之前就会发生变化,比如这样:
所以想让打印序列随机,可以加一个random在这二者之间随机选择,不过这个事情应该还可以通过继续优化条件变量来实现,不过这里就先不尝试了。
(3).还是合法括号序列
利用条件变量实现合法括号序列实际上就是解决这一系列计数类型的生产者—消费者问题,因此我们只需要使用一个cnt变量存储当前的括号嵌套深度就行,这个听起来不是很难实现,因此有了下面的代码:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
#define NUMS 10
int n, cnt = 0;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
#define PRODUCE (cnt < n)
#define CONSUME (cnt > 0)
void* Tproduce()
{
while (1) {
pthread_mutex_lock(&lock);
while (!PRODUCE) {
pthread_cond_wait(&cv, &lock);
}
printf("(");
cnt++;
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&lock);
}
pthread_exit(NULL);
}
void* Tconsume()
{
while (1) {
pthread_mutex_lock(&lock);
while (!CONSUME) {
pthread_cond_wait(&cv, &lock);
}
printf(")");
cnt--;
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&lock);
}
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
assert(argc == 2);
n = atoi(argv[1]);
pthread_t thr[NUMS * 2];
for (int i = 0; i < NUMS; i++) {
pthread_create(&thr[i * 2], NULL, Tproduce, NULL);
pthread_create(&thr[i * 2 + 1], NULL, Tconsume, NULL);
}
for (int i = 0; i < NUMS; i++) {
pthread_join(thr[i * 2], NULL);
pthread_join(thr[i * 2 + 1], NULL);
}
return 0;
}
(4).还是打印很多鱼
用条件变量做完了第一个例子之后,最后一个例子其实就已经迎刃而解了,我们只要稍微变换一下状态机就行:
其实连状态的数量都是一样的,只要改一改输出的信号就行:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <string.h>
enum { A = 1, B, C, D, E, F };
struct rule
{
int from, ch, to;
};
struct rule rules[] = {
{ A, '<', B },
{ B, '>', C },
{ C, '<', D },
{ A, '>', E },
{ E, '<', F },
{ F, '>', D },
{ D, '_', A }
};
int current = A, quota = 1;
pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
int next(char ch)
{
for (int i = 0; i < 7; i++) {
struct rule* rule = &rules[i];
if (rule->from == current && rule->ch == ch) {
return rule->to;
}
}
return 0;
}
int can_print(char ch)
{
return next(ch) != 0 && quota > 0;
}
void seq_before(char ch)
{
pthread_mutex_lock(&lk);
while (!can_print(ch)) {
pthread_cond_wait(&cv, &lk);
}
quota--;
pthread_mutex_unlock(&lk);
}
void seq_after(char ch)
{
pthread_mutex_lock(&lk);
quota++;
current = next(ch);
assert(current);
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&lk);
}
const char roles[] = ".<<<<<<<>>>>___";
void* seq_thread(void* id)
{
char role = roles[*(int*)id];
while (1) {
seq_before(role);
putchar(role);
seq_after(role);
}
pthread_exit(NULL);
}
int main()
{
pthread_t thr[50];
int len = strlen(roles);
int n[50];
for (int i = 1; i <= len; i++) n[i] = i;
for (int i = 1; i <= len; i++) {
pthread_create(&thr[i], NULL, seq_thread, (void*)&n[i]);
}
for (int i = 1; i <= len; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
与例1的实现几乎一致,两次的代码都参考了jyy老师的代码,效果如下,条件变量真的是并发程序的优雅实现方式:
总结
这次的实践让我彻底明白一件事:并发程序真的非常难写,对于人类来说,一个在任何时候可能切换,并且情况难以确定的程序是很不自然的,我们很难一口气列举出所有的情况(当然这好像是个NPC问题),不过对比相同例子下信号量和条件变量的实现,条件变量能够以一个更清晰地框架让我们完成整个并发程序的编写过程。
原本我以为并发程序只要加上锁了,就可以避免掉大部分问题了,结果出现了一堆死锁,整个程序动都动不起来,结果去掉锁之后又出现了大量同步问题,所以写并发程序真的需要付出相当大的努力。
后续(2024.3.26),我已经注意到之前的枚举打印鱼的所有情况的状态转移图可能是有问题的,因此根据在条件变量中使用了新的状态转移图重新写出了下面的代码:
#include <stdio.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdlib.h>
sem_t ba, ea, cb, fe, d, ad;
void init()
{
sem_init(&ba, 0, 0);
sem_init(&ea, 0, 0);
sem_init(&cb, 0, 0);
sem_init(&fe, 0, 0);
sem_init(&d, 0, 0);
sem_init(&ad, 0, 1);
}
void* Ta()
{
while (1) {
sem_wait(&ad);
int ran = rand() % 2;
if (ran) {
printf("<");
sem_post(&ba);
}
else {
printf(">");
sem_post(&ea);
}
}
pthread_exit(NULL);
}
void* Tb()
{
while (1) {
sem_wait(&ba);
printf(">");
sem_post(&cb);
}
pthread_exit(NULL);
}
void* Te()
{
while (1) {
sem_wait(&ea);
printf("<");
sem_post(&fe);
}
pthread_exit(NULL);
}
void* Tc()
{
while (1) {
sem_wait(&cb);
printf("<");
sem_post(&d);
}
pthread_exit(NULL);
}
void* Tf()
{
while (1) {
sem_wait(&fe);
printf(">");
sem_post(&d);
}
pthread_exit(NULL);
}
void* Td()
{
while (1) {
sem_wait(&d);
printf("_");
sem_post(&ad);
}
pthread_exit(NULL);
}
int main()
{
init();
void* (*funcs[6])() = {Ta, Tb, Tc, Td, Te, Tf};
pthread_t thr[6];
for (int i = 0; i < 6; i++) {
pthread_create(&thr[i], NULL, funcs[i], NULL);
}
for (int i = 0; i < 6; i++) {
pthread_join(thr[i], NULL);
}
return 0;
}
经过测试,这段代码是可以顺利运行的,这个也给了我一个启示:对于存在多路分支,但是每轮过程当中只可能有一条在运行的计算图问题,实际上应该将多条路径的最终汇聚点前的边先汇聚,例如在这里将df和dc两条边合并成了d,也就是对应只有一个信号量d,这样就可以完成整个同步过程了
参考资料
- 1.同步:生产者-消费者与条件变量 (算法并行化;万能同步方法)
- 2.同步:信号量与哲♂学家吃饭问题 (信号量的正确打开方式)
- 3.C++ 多线程编程(二):pthread的基本使用
- 4.pthread_mutex_lock
- 5.Mutex lock for Linux Thread Synchronization