【APUE】补充 — 基于管道的线程池

目录

一、引言

二、代码实现 

三、思考 


一、引言

在线程章节的 3.2 部分,我们曾经提到过线程池的实现

在当时的代码中,我们仅仅用的一个 int 类型的变量来表示这个“池”,用来存放任务

显然这个池太小了,如果下游线程很多,可能会出现以下情况:

我们只需要将任务池的容量增大点,就可以很好地减少上述提到的多次调度带来的上下文切换开销

池需要用一个数据结构来维护其池中的任务(数据),我们选用队列进行实现

其实队列这个数据结构的特征和管道非常类似,数据从队列尾入队,从队列首出队数据写入管道的写端,然后从管道的读端读取数据

我们接下来将动手实现一个队列(管道),并基于该管道扩充池的容量。其实内核也提供了现成的管道,我们不直接使用内核提供的管道主要有以下两点考虑:

  • 内核提供的管道可用于进程间通信,线程间通信没必要用内核提供的机制,用的话就有点儿大材小用了
  • 自己实现管道能够深入了解管道的特点,为后续讲解进程间通信做铺垫(到时候会用到内核提供的管道)

二、代码实现 

mypipe.h,暴露接口

#ifndef MYPIPE_H__
#define MYPIPE_H__

#define PIPESIZE 1024
#define MYPIPE_READ 0b00000001UL
#define MYPIPE_WRITE 0b00000010UL

typedef void mypipe_t;

mypipe_t* mypipe_init(void);	// 创建(初始化)一个管道

int mypipe_register(mypipe_t *, int opmap);	// 注册用户身份

int mypipe_unregister(mypipe_t *, int opmap);	// 取消注册用户身份

int mypipe_read(mypipe_t *, int *buf, size_t count);	// 读管道

int mypipe_write(mypipe_t *, const int *buf, size_t count);	// 写管道

int mypipe_destroy(mypipe_t *);	// 销毁管道	

#endif

mypipe.c,实现接口。注意如何将在 mypipe.h 中隐藏的数据结构解除隐藏

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "mypipe.h"

struct mypipe_st {	// 并发队列
	int head;	// 指向队头
	int tail;	// 指向队尾
	int data[PIPESIZE];	// 存放数据的地方
	int datasize;	// 队列中有效数据的个数
	pthread_mutex_t mutex;	// 互斥量
	pthread_cond_t cond;	// 条件变量
	int count_rd, count_wr;	// 读者写者计数
				// 从这里看出,我们需要用户指定其读者/写者的身份
};

mypipe_t * mypipe_init(void) {	// 初始化一个管道
	struct mypipe_st *me;
	me = malloc(sizeof(*me));
	if (me == NULL)
		return NULL;
	me->head = 0;
	me->tail = 0;
	me->datasize = 0;
	me->count_rd = 0;	// 读者个数为0
	me->count_wr = 0;	// 写者个数为0
	pthread_mutex_init(&me->mutex, NULL);
	pthread_cond_init(&me->cond, NULL);
	return me;
}

int mypipe_register(mypipe_t *ptr, int opmap) {	// 用户通过该函数指定其身份
	struct mypipe_st * me = ptr;	// 之前在.h文件中隐藏了ptr所表征的数据结构,在这里取消隐藏
	pthread_mutex_lock(&me->mutex);	// 可能多个线程同时注册身份,需要互斥
	if (opmap & MYPIPE_READ)	// 将宏看成位图,注意位图操作
		me->count_rd++;
	if (opmap & MYPIPE_WRITE)
		me->count_wr++;
	pthread_mutex_unlock(&me->mutex);	
	return 0;
}

int mypipe_unregister(mypipe_t *ptr, int opmap) {	// 用户通过该函数取消注册其身份
	struct mypipe_st * me = ptr;
	pthread_mutex_lock(&me->mutex);
	if (opmap & MYPIPE_READ)
		me->count_rd--;
	if (opmap & MYPIPE_WRITE)
		me->count_wr--;
	pthread_cond_broadcast(&me->cond);	// 可能读者或者写者被减为0了,需要通知一下read及write
	pthread_mutex_unlock(&me->mutex);
	return 0;
}

int mypipe_read(mypipe_t * ptr, int *buf, size_t count) {
	struct mypipe_st * me = ptr;
	pthread_mutex_lock(&me->mutex);
	while (me->datasize <= 0 && me->count_wr > 0)	// 当管道中没有数据,但是还有写者,就继续等待
	{
		pthread_cond_wait(&me->cond, &me->mutex);	// 等待管道中有数据
								// 等待写者数量变为0
	}
	if (me->datasize <= 0 && me->count_wr <= 0)
	{
		pthread_mutex_unlock(&me->mutex);	// 没有写者且管道中没有数据,就直接返回读取到的字节数为0
		return 0;
	}

	int i;
	for (i = 0; i < count; ++i) {	// 读取
		if (me->datasize <= 0)
			break;
		*(buf+i) = me->data[me->head];
		me->head = (me->head + 1)%PIPESIZE;	// 读出来了,相当于队列首出队了一个元素
		me->datasize--;
	}
	pthread_cond_broadcast(&me->cond);	// 告诉写者能写了
	pthread_mutex_unlock(&me->mutex);
	return i;
}

int mypipe_write(mypipe_t *ptr, const int *buf, size_t count) {
	struct mypipe_st * me = ptr;
	pthread_mutex_lock(&me->mutex);
	while (me->datasize >= PIPESIZE && me->count_rd > 0)	// 当管道满,但是还有读者,就继续等待 
		pthread_cond_wait(&me->cond, &me->mutex);	// 等待读者读出数据,使管道不满
								// 等待读者数量变为0

	if (me->datasize >= PIPESIZE && me->count_rd <= 0)	// 没有读者后,且管道已经满了,就没必要继续写了,直接返回
	{
		pthread_mutex_unlock(&me->mutex);
		return 0;
	}

	int i;
	for (i = 0; i < count; ++i) {	// 写入
		if (me->datasize == PIPESIZE)
			break;
		me->data[me->tail] = *(buf+i);
		me->tail = (me->tail + 1)%PIPESIZE;
		me->datasize++;
	}
	pthread_cond_broadcast(&me->cond);	// 告诉读者能读了
	pthread_mutex_unlock(&me->mutex);
	return i;
}

int mypipe_destroy(mypipe_t * ptr) {
	struct mypipe_st * me = ptr;
	pthread_mutex_destroy(&me->mutex);
	pthread_cond_destroy(&me->cond);
	free(ptr);
	return 0;
}

main.c,演示用户如何使用接口。main 首先创建 10 个线程(这 10 个线程负责从池中获取数据并判断是否为质数),然后 main 线程自身源源不断往池中写入数据。我们需要用我们上述写的管道来表征这个池,注意如何使用我们的接口:

  1. 创建管道
  2. 往管道写入数据前需要注册写者身份,写入完毕后需要取消注册的身份
  3. 从管道读取数据前需要注册读者身份,读取完毕后需要取消注册的身份
  4. 管道用完后需要销毁管道
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "mypipe.h"

#define NUM_THREADS 10
#define START_NUM 30000000
#define END_NUM 30000200

// 质数判断函数
int is_prime(int number) {
	if (number <= 1) return 0;
	for (int i = 2; i * i <= number; i++) {
		if (number % i == 0) return 0;
	}
	return 1;
}

// 线程函数
void *reader_function(void *arg) {
	mypipe_t *pipe = (mypipe_t *)arg;
	int number;

	mypipe_register(pipe, MYPIPE_READ);

	while (mypipe_read(pipe, &number, 1) > 0) {
		if (is_prime(number)) {
			printf("Prime number: %d\n", number);
		}
	}

	mypipe_unregister(pipe, MYPIPE_READ);
	return NULL;
}

int main() {
	pthread_t threads[NUM_THREADS];
	mypipe_t *pipe = mypipe_init();

	// 创建读线程
	for (int i = 0; i < NUM_THREADS; i++) {
		pthread_create(&threads[i], NULL, reader_function, pipe);
	}

	// 主线程写入数据
	mypipe_register(pipe, MYPIPE_WRITE);
	for (int i = START_NUM; i <= END_NUM; i++) {
		mypipe_write(pipe, &i, 1);
	}
	mypipe_unregister(pipe, MYPIPE_WRITE);
	// 等待所有读线程完成
	for (int i = 0; i < NUM_THREADS; i++) {
		pthread_join(threads[i], NULL);
	}

	// 销毁管道
	mypipe_destroy(pipe);
	return 0;
}

结果示例如下

我们判断的是从 30000000 到 30000200 之间的质数,结果和线程章节的 3.2 中所实现代码的运行结果一致,说明代码没毛病 

我们从我们自己创建的管道,可以看出管道的如下几个特点

  • 管道通信是单工的。一方作为读者,另一方作为写者。写永远写在尾部,读永远是从首部读
  • 管道必须凑齐读写双方才能正常运行。注意 mypipe_read 和 mypipe_write 中的 while 循环条件:只要缺失读者或者写者,另一方就可能直接返回而不会等待新的数据(哪怕后面可能有新的读者或者写者加入并读写管道)
  • 管道内部自带同步与互斥机制

三、思考 

上述代码的一个缺陷是:没有办法约束用户的行为!

比如我是一个用户,我注册了写者身份后却调用的 write......

那可不妥!!一种好的思路是引入权限的概念,并对上述接口再封装一层......

上述蓝色字体表示封装出来的新的接口,以供用户调用绿色字体介绍了这样封装的思想和逻辑。其实这就是 UNIX “一切皆文件”思想的部分实现方式!即:就算最底层可能是完全不一样的东东(操作管道、操作设备、操作普通文件......),也会再封装一层,并提供通用的接口(如 open、read、write、close 和文件描述符 fd......)。这样一来,在用户的视角里,操作文件的接口也可以操作很多不一样的东东,因此“一切皆文件”

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/161743.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录 Day49 单调栈01 LeetCode LeetCodeT739每日温度 T496 下一个最大元素I

前言 折磨的死去活来的动态规划终于结束啦,今天秋秋给大家带来两题非常经典的单调栈问题,可能你不清楚单调栈是什么,可以用来解决什么问题,今天我们就来一步一步的逐渐了解单调栈,到能够灵活使用单调栈.注意以下讲解中&#xff0c;顺序的描述为 从栈头到栈底的顺序 什么时候用单…

3D建模基础教程:编辑样条线【子层级】

了解子层级编辑样条线 在3D建模中&#xff0c;样条线是创建各种形状和曲线的重要工具。而编辑样条线是3D建模过程中不可或缺的一部分。今天&#xff0c;我们将一起学习如何编辑样条线&#xff0c;以及了解其子层级的相关知识。 样条线的子层级介绍 样条线的子层级包括&#xff…

Java的IO流-缓冲流

字节缓冲流 package com.itheima.d2;import java.io.*;public class Test1 {public static void main(String[] args) {try (InputStream is new FileInputStream("IO/src/itheima01.txt");//1、定义一个字节缓冲输入流包装原始的字节输入流InputStream bis new Bu…

任你五花八门预训练方法,我自监督学习依然能打!

长时间没看论文&#xff0c;外面已经发展成这样了&#xff1f; 以下都是新paper&#xff0c;挑了几个感兴趣的&#xff0c;一起粗略看看吧~ Battle of the Backbones: A Large-Scale Comparison of Pretrained Models across Computer Vision Tasks GitHub | https://github.…

linux基本指令总结--文件和目录

前言&#xff1a; 想要学好Linux操作系统&#xff0c;理解并熟悉一些基本的指令是必要的&#xff0c;下面我将整理出关于文件和目录操作的一些基本指令和用法&#xff0c;我的linux环境部署在服务器端&#xff0c;使用xshell软件进行远程操作。 本章指令整合&#xff1a; ls查…

十个一手app拉新地推拉新推广接单平台,放单/接任务渠道

做过地推拉新的朋友一定都非常清楚&#xff0c;app拉新推广一手接单平台&#xff0c;和非一手接任务平台之间的收益差&#xff0c;可以用天壤之别来形容。那么一手app拉新渠道应该怎么找&#xff1f;下面这十个常见的地推拉新app接单平台&#xff0c;一定要收藏。 1. 聚量推客…

学习c#的第十四天

目录 C# 接口&#xff08;Interface&#xff09; 接口的特点 定义接口 接口继承 接口和抽象类的区别 C# 命名空间&#xff08;Namespace&#xff09; using 关键字 定义命名空间 嵌套命名空间 C# 接口&#xff08;Interface&#xff09; 接口定义了所有类继承接口时应…

036、目标检测-锚框

之——对边缘框的简化 目录 之——对边缘框的简化 杂谈 正文 1.锚框操作 2.IoU交并比 3.锚框标号 4.非极大值抑制 5.实现 拓展 杂谈 边缘框这样一个指定roi区域的操作对卷积神经网络实际上是很不友好的&#xff0c;这可能会对网络感受野提出一些特定的要求&#xff0…

HUAWEI华为笔记本MateBook X 2021款i5集显(EULD-WFH9,WXX9)原装出厂Windows11系统工厂模式包

下载链接&#xff1a;https://pan.baidu.com/s/1gQ_O203SSm83Nc-zDk1iNA?pwd4exz 提取码&#xff1a;4exz 系统带F10一键智能还原功能隐藏恢复分区、所有驱动、Office办公软件、华为电脑管家等预装程序 所需要工具&#xff1a;32G或以上的U盘 文件格式&#xff1a;zip …

智慧工地APP全套源码,智慧工地云平台

智慧工地平台 &#xff0c;智慧工地源码&#xff0c;智慧工地APP全套源码 智慧工地以施工现场风险预知和联动预控为目标&#xff0c;将智能AI、传感技术、人像识别、监控、虚拟现实、物联网、5G、大数据、互联网等新一代科技信息技术植入到建筑、机械、人员穿戴设施、场地进出关…

Linux下查看pytorch运行时真正调用的cuda版本

一般情况我们会安装使用多个cuda版本。而且pytorch在安装时也会自动安装一个对应的版本。 正确查看方式&#xff1a; 想要查看 Pytorch 实际使用的运行时的 cuda 目录&#xff0c;可以直接输出 cpp_extension.py 中的 CUDA_HOME 变量。 import torch import torch.utils imp…

Nginx反向代理和负载均衡

1.反向代理 反向代理&#xff08;Reverse Proxy&#xff09;方式是指以代理服务器来接受internet上的连接请求&#xff0c;然后将请求转发给内部网络上的服务器&#xff0c;并将从服务器上得到的结果返回给internet上请求连接的客户端&#xff0c;此时代理服务器对外就表现为一…

springboot+vue+element简单实现教学课程申报管理系统

目录 一、项目预览 二、项目效果图及说明 1.项目说明 1.登录 2.欢迎页 3.教师管理 4.课程申报 ​5.管理员管理 三、代码实现 1.后端项目结构图 2.数据库表脚本 3.路由配置 四、总结 一、项目预览 在线预览&#xff1a;点击访问其他项目访问&#xff1a;点击访问后端实…

Java学习之路 —— Java高级

文章目录 前言1. 单元测试2. 反射2.1 获取Class对象的三种方式2.2 获取类的构造器的方法2.3 获取类的成员变量2.4 获取类的成员方法2.5 反射的作用 3. 注解3.1 自定义注解3.2 注解的原理3.3 元注解3.4 注解的解析 4. 动态代理5. 总结 前言 终于走到新手村的末端了&#xff0c;…

青少年CTF-WEB-Flag在哪里?

题目环境&#xff1a;F12查看源代码得到flag&#xff1a;qsnctf{1167716c-54f0-47da-baed-49e3b08dfaeb} 此题主要考察F12查看源代码的使用

java.net.UnknownHostException: eureka

java.net.UnknownHostException: eureka 哦。HOST漏了 #linux /etc/hosts #windows C:\Windows\System32\drivers\etc\hosts 127.0.0.1 eureka7000 127.0.0.1 eureka7001 127.0.0.1 eureka7002

MATLAB画图分辨率、图像大小研究

MATLAB画图分辨率、图像大小研究 Figure属性中 InnerPosition Position OuterPosition区别画图与打印的分辨率和图像大小研究首先明确两个概念&#xff1a;MATLAB实操画图 Figure属性中 InnerPosition Position OuterPosition区别 在画图的时候&#xff0c;我们经常需要设置Fi…

Leetcode—142.环形链表II【中等】

2023每日刷题&#xff08;三十三&#xff09; Leetcode—142.环形链表II 实现代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ struct ListNode *detectCycle(struct ListNode *head) {struct ListNode* …

python中sklearn库在数据预处理中的详细用法,及5个常用的Scikit-learn(通常简称为 sklearn)程序代码示例

文章目录 前言1. 数据清洗&#xff1a;使用 sklearn.preprocessing 中的 StandardScaler 和 MinMaxScaler 进行数据规范化。2. 缺失值处理&#xff1a;使用 sklearn.impute 中的 SimpleImputer 来填充缺失值。3. 数据编码&#xff1a;使用 sklearn.preprocessing 中的 OneHotEn…