前言
这是当时选修《操作系统原理》时需要做的一个实验项目,也是三级项目,选题选的是第二章有关并行计算的实验,这个项目有匿名版的PPT和视频,可以私我,发送“基于并行计算的的单词统计”即可获取整个项目的压缩包。
1.软硬件环境
-
操作系统:ubuntu22.04.3LTS
-
硬件要求:3G内存、4个处理器
-
开发工具:Vscode
-
第三方库和依赖项:<stdio.h> 、<pthread.h>、<ctype.h> 、<stdlib.h>、 、、
2.实验名称
基于分布并行计算的多线程单词统计工具
2.1实验解决方案
- 区分单词原则:
凡是一个非字母或数字的字符跟在字母或数字的后面,那么这个字母或数字就是单词的结尾。 - 总单词数访问互斥原则:
允许线程使用互斥锁来修改临界资源,确保线程间的同步与协作。如果两个线程需要安全的共享一个公共计数器,需要把公共计数器加锁,线程需要访问称为互斥锁的变量,它可以使线程间很好的合作,避免对于资源的访问冲突。 - 基于线程的分布并行计算原则:
在count_words函数使用了一个线程池来并行处理文件中的数据。这个函数首先将文件分割成多个部分,然后创建一个线程池,每个线程处理文件的一个部分。每个线程都有一个本地的单词计数器,当它完成对其部分的处理后,它会更新全局的单词计数器。
2.2 程序代码
v1.0——word_count_base.cpp
#include <stdio.h>
#include <pthread.h>
#include <ctype.h>
#include <stdlib.h>
pthread_mutex_t counter_lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
int total_words = 0; // 单词总数
// 统计单词数
void *count_words(void *f)
{
char *filename = (char *)f; // 文件名
FILE *fp; // 文件指针
int c, prevc = '\0'; // c为当前字符,prevc为前一个字符
if ((fp = fopen(filename, "r")) != NULL)
{ // 打开文件
while ((c = getc(fp)) != EOF)
{ // 读取文件
if (!isalnum(c) && isalnum(prevc))
{ // 如果当前字符不是字母或数字,前一个字符是字母或数字
pthread_mutex_lock(&counter_lock); // 加锁
total_words++; // 单词数加1
pthread_mutex_unlock(&counter_lock); // 解锁
}
prevc = c; // 更新前一个字符
}
fclose(fp); // 关闭文件
}
else
{ // 打开文件失败
perror(filename); // 打印错误信息
}
return NULL;
}
int main(int ac, char *av[])
{
pthread_t t1, t2; // 两个线程
if (ac != 3)
{ // 参数错误
printf("Usage:%s file1 file2\n", av[0]); // 打印使用方法
exit(1);
}
pthread_create(&t1, NULL, count_words, av[1]); // 创建线程1
pthread_create(&t2, NULL, count_words, av[2]); // 创建线程2
pthread_join(t1, NULL); // 等待线程1结束
pthread_join(t2, NULL); // 等待线程2结束
printf("Total words: %d\n", total_words); // 打印单词总数
return 0;
}
v1.1——word_count_with_time.cpp
#include <stdio.h> // 引入标准输入输出库
#include <pthread.h> // 引入线程库
#include <ctype.h> // 引入isalnum函数
#include <stdlib.h> // 引入exit函数
#include <chrono> // 引入时间库
pthread_mutex_t counter_lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
int total_words = 0; // 单词总数
// 统计单词数
void *count_words(void *f)
{
char *filename = (char *)f; // 文件名
FILE *fp; // 文件指针
int c, prevc = '\0'; // c为当前字符,prevc为前一个字符
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
if ((fp = fopen(filename, "r")) != NULL)
{ // 打开文件
while ((c = getc(fp)) != EOF)
{ // 读取文件
if (!isalnum(c) && isalnum(prevc))
{ // 如果当前字符不是字母或数字,前一个字符是字母或数字
pthread_mutex_lock(&counter_lock); // 加锁
total_words++; // 单词数加1
pthread_mutex_unlock(&counter_lock); // 解锁
}
prevc = c; // 更新前一个字符
}
fclose(fp); // 关闭文件
}
else
{ // 打开文件失败
perror(filename); // 打印错误信息
}
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::chrono::duration<double> diff = end - start; // 计算时间差
printf("Thread time taken: %f seconds\n", diff.count()); // 打印所用时间
return NULL;
}
int main(int ac, char *av[])
{
pthread_t t1, t2; // 两个线程
if (ac != 3)
{ // 参数错误
printf("Usage:%s file1 file2\n", av[0]); // 打印使用方法
exit(1);
}
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
pthread_create(&t1, NULL, count_words, av[1]); // 创建线程1
pthread_create(&t2, NULL, count_words, av[2]); // 创建线程2
pthread_join(t1, NULL); // 等待线程1结束
pthread_join(t2, NULL); // 等待线程2结束
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::chrono::duration<double> diff = end - start; // 计算时间差
printf("Total words: %d\n", total_words); // 打印单词总数
printf("Time taken: %f seconds\n", diff.count()); // 打印所用时间
return 0;
}
v1.2——word_count_thread_arr.cpp
#include <stdio.h> // 引入标准输入输出库
#include <pthread.h> // 引入线程库
#include <ctype.h> // 引入isalnum函数
#include <stdlib.h> // 引入exit函数
#include <chrono> // 引入时间库
pthread_mutex_t counter_lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
int total_words = 0; // 单词总数
// 统计单词数
void *count_words(void *f)
{
char *filename = (char *)f; // 文件名
FILE *fp; // 文件指针
int c, prevc = '\0'; // c为当前字符,prevc为前一个字符
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
if ((fp = fopen(filename, "r")) != NULL)
{ // 打开文件
while ((c = getc(fp)) != EOF)
{ // 读取文件
if (!isalnum(c) && isalnum(prevc))
{ // 如果当前字符不是字母或数字,前一个字符是字母或数字
pthread_mutex_lock(&counter_lock); // 加锁
total_words++; // 单词数加1
pthread_mutex_unlock(&counter_lock); // 解锁
}
prevc = c; // 更新前一个字符
}
fclose(fp); // 关闭文件
}
else
{ // 打开文件失败
perror(filename); // 打印错误信息
}
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::chrono::duration<double> diff = end - start; // 计算时间差
printf("Thread time taken: %f seconds\n", diff.count()); // 打印所用时间
return NULL;
}
int main(int ac, char *av[])
{
if (ac < 2)
{ // 参数错误
printf("Usage:%s file1 [file2 ...]\n", av[0]); // 打印使用方法
exit(1);
}
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
int num_files = ac - 1; // 文件数量
pthread_t *threads = new pthread_t[num_files]; // 创建线程数组
for (int i = 0; i < num_files; ++i)
{
pthread_create(&threads[i], NULL, count_words, av[i + 1]); // 创建线程
}
for (int i = 0; i < num_files; ++i)
{
pthread_join(threads[i], NULL); // 等待线程结束
}
delete[] threads; // 删除线程数组
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::chrono::duration<double> diff = end - start; // 计算时间差
printf("Total words: %d\n", total_words); // 打印单词总数
printf("Time taken: %f seconds\n", diff.count()); // 打印所用时间
return 0;
}
v1.3——word_count_thread_arr_better.cpp
#include <stdio.h> // 引入标准输入输出库
#include <pthread.h> // 引入线程库
#include <ctype.h> // 引入isalnum函数
#include <stdlib.h> // 引入exit函数
#include <chrono> // 引入时间库
pthread_mutex_t counter_lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
int total_words = 0; // 单词总数
// 统计单词数
void *count_words(void *f)
{
char *filename = (char *)f; // 文件名
FILE *fp; // 文件指针
int c, prevc = '\0'; // c为当前字符,prevc为前一个字符
int local_word_count = 0; // 本地单词数
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
std::time_t start_time = std::chrono::system_clock::to_time_t(start);
printf("%-30s: Start time: %s", filename, std::ctime(&start_time)); // 打印开始时间
if ((fp = fopen(filename, "r")) != NULL)
{ // 打开文件
while ((c = getc(fp)) != EOF)
{ // 读取文件
if (!isalnum(c) && isalnum(prevc))
{ // 如果当前字符不是字母或数字,前一个字符是字母或数字
pthread_mutex_lock(&counter_lock); // 加锁
total_words++; // 单词数加1
pthread_mutex_unlock(&counter_lock); // 解锁
}
if (!isalnum(c) && isalnum(prevc))
{
local_word_count++; // 单词数加1
}
prevc = c; // 更新前一个字符
}
fclose(fp); // 关闭文件
}
else
{ // 打开文件失败
perror(filename); // 打印错误信息
}
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); // 计算时间差
int milliseconds = duration.count() % 1000; // 获取毫秒
int seconds = (duration.count() / 1000) % 60; // 获取秒
int minutes = (duration.count() / 1000) / 60; // 获取分钟
printf("%-30s: Word count: %-30d Thread time taken: %d minutes %d seconds %d milliseconds\n", filename, local_word_count, minutes, seconds, milliseconds); // 打印文件名、单词数和所用时间
return NULL;
}
int main(int ac, char *av[])
{
if (ac < 2)
{ // 参数错误
printf("Usage:%s file1 [file2 ...]\n", av[0]); // 打印使用方法
exit(1);
}
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
int num_files = ac - 1; // 文件数量
pthread_t *threads = new pthread_t[num_files]; // 创建线程数组
for (int i = 0; i < num_files; ++i)
{
pthread_create(&threads[i], NULL, count_words, av[i + 1]); // 创建线程
}
for (int i = 0; i < num_files; ++i)
{
pthread_join(threads[i], NULL); // 等待线程结束
}
delete[] threads; // 删除线程数组
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::chrono::duration<double> diff = end - start; // 计算时间差
printf("Total words: %d\n", total_words); // 打印单词总数
printf("Time taken: %f seconds\n", diff.count()); // 打印所用时间
return 0;
}
v1.4——word_count_final.cpp
#include <stdio.h> // 引入标准输入输出库
#include <pthread.h> // 引入线程库
#include <ctype.h> // 引入isalnum函数
#include <stdlib.h> // 引入exit函数
#include <chrono> // 引入时间库
#include <thread> // 引入线程库
#include <vector> // 引入向量库
pthread_mutex_t counter_lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
int total_words = 0; // 单词总数
// 统计单词数
void *count_words(void *f)
{
char *filename = (char *)f; // 文件名
int local_word_count = 0; // 本地单词数
FILE *fp; // 文件指针
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
std::time_t start_time = std::chrono::system_clock::to_time_t(start);
printf("%-30s: Start time: %s", filename, std::ctime(&start_time)); // 打印开始时间
if ((fp = fopen(filename, "r")) != NULL)
{ // 打开文件
fseek(fp, 0, SEEK_END);
long file_size = ftell(fp); // 获取文件大小
fseek(fp, 0, SEEK_SET);
char *file_content = new char[file_size];
fread(file_content, 1, file_size, fp); // 读取整个文件到内存中
fclose(fp); // 关闭文件
int num_threads = std::thread::hardware_concurrency(); // 获取硬件支持的并发线程数
long part_size = file_size / num_threads; // 计算每个线程需要处理的部分大小
std::vector<std::thread> threads; // 线程池
std::vector<int> local_word_counts(num_threads, 0); // 每个线程的单词数
for (int i = 0; i < num_threads; ++i)
{
threads.push_back(std::thread([=, &local_word_counts]()
{
int c, prevc = '\0'; // c为当前字符,prevc为前一个字符
long start_pos = i * part_size; // 计算这个线程需要处理的部分的开始位置
long end_pos = (i == num_threads - 1) ? file_size : (i + 1) * part_size; // 计算这个线程需要处理的部分的结束位置
for (long pos = start_pos; pos < end_pos; ++pos)
{ // 读取文件
c = file_content[pos];
if (!isalnum(c) && isalnum(prevc))
{ // 如果当前字符不是字母或数字,前一个字符是字母或数字
pthread_mutex_lock(&counter_lock); // 加锁
total_words++; // 单词数加1
pthread_mutex_unlock(&counter_lock); // 解锁
local_word_counts[i]++; // 单词数加1
}
prevc = c; // 更新前一个字符
} }));
}
for (int i = 0; i < num_threads; ++i)
{
threads[i].join(); // 等待线程完成
local_word_count += local_word_counts[i]; // 累加本地单词数
}
delete[] file_content; // 释放内存
}
else
{ // 打开文件失败
perror(filename); // 打印错误信息
}
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::time_t end_time = std::chrono::system_clock::to_time_t(end);
printf("%-30s: End time: %s", filename, std::ctime(&end_time)); // 打印开始时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start); // 计算时间差
int milliseconds = duration.count() % 1000; // 获取毫秒
int seconds = (duration.count() / 1000) % 60; // 获取秒
int minutes = (duration.count() / 1000) / 60; // 获取分钟
printf("%-30s: Word count: %-30d Thread time taken: %d minutes %d seconds %d milliseconds\n", filename, local_word_count, minutes, seconds, milliseconds); // 打印文件名、单词数和所用时间
return NULL;
}
int main(int ac, char *av[])
{
if (ac < 2)
{ // 参数错误
printf("Usage:%s file1 [file2 ...]\n", av[0]); // 打印使用方法
exit(1);
}
auto start = std::chrono::high_resolution_clock::now(); // 记录开始时间
int num_files = ac - 1; // 文件数量
pthread_t *threads = new pthread_t[num_files]; // 创建线程数组
for (int i = 0; i < num_files; ++i)
{
pthread_create(&threads[i], NULL, count_words, av[i + 1]); // 创建线程
}
for (int i = 0; i < num_files; ++i)
{
pthread_join(threads[i], NULL); // 等待线程结束
}
delete[] threads; // 删除线程数组
auto end = std::chrono::high_resolution_clock::now(); // 记录结束时间
std::chrono::duration<double> diff = end - start; // 计算时间差
printf("Total words: %d\n", total_words); // 打印单词总数
printf("Time taken: %f seconds\n", diff.count()); // 打印所用时间
return 0;
}
3.测试(测试用例设计、运行结果)
-
v1.0——word_count_base.cpp
-
v1.1——word_count_with_time_count.cpp
-
v1.2——word_count_thread_arr.cpp
-
v1.3——word_count_thread_arr_better.cpp
-
v.1.4——word_count_final.cpp
5.系统特色以及可扩展点
5.1 特色
这是是一个使用 POSIX 线程(pthread)的程序,它能并发地计算多个文件中的单词数及单次总数。每个文件由一个独立的线程处理,所有线程都能够更新全局变量 total_words 来累计所有文件中的单词总数。其中的互斥锁 counter_lock 用于同步对共享变量 total_words 的访问。
5.2 可扩展点
- 性能优化:可以使用条件变量或其他同步机制,减少互斥锁的使用,以提高并发性。
- 动态线程管理:使用线程池来管理线程,而不是为每个文件创建一个线程,这在文件数量特别多时能够提高效率。
- 支持更多文件格式:目前的程序假设所有文件都是文本文件,并使用 isalnum 函数来检测单词。可以扩展支持其他文件格式,比如PDF或Word文档。
- Unicode支持:增加对非ASCII字符的支持,识别其他语言的单词。
- 用户交互:允许用户在程序运行时进行交互,比如取消正在处理的文件或查询当前处理的状态
6.感想
通过上述代码我们展示了基本的线程创建、同步和资源管理。但上述程序仍有很大改进空间,包括优化性能、提高错误处理能力以及增加新功能的可能性等等。
结束语
如果有疑问欢迎大家留言讨论,你如果觉得这篇文章对你有帮助可以给我一个免费的赞吗?你们的认可是我最大的分享动力!