使用mpi并行技术实现wordcount算法

【问题描述】

编写程序统计一个英文文本文件中每个单词的出现次数(词频统计),并将统计结果按单词字典序输出到屏幕上。

注:在此单词为仅由字母组成的字符序列。包含大写字母的单词应将大写字母转换为小写字母后统计。

【输入形式】

打开当前目录下文件article.txt;,从中读取英文单词进行词频统计。

【输出形式】

程序将单词统计结果按单词字典序输出到屏幕上,每行输出一个单词及其出现次数,单词和其出现次数间由一个空格分隔,出现次数后无空格,直接为回车。

【样例输入】

当前目录下文件article.txt内容如下:

Do not take to heart every thing you hear.

Do not spend all that you have.

Do not sleep as long as you want;

【样例输出】

all 1

as 2

do 3

every 1

have 1

hear 1

heart 1

long 1

not 3

sleep 1

spend 1

take 1

that 1

thing 1

to 1

want 1

you 3

【样例说明】

输出单词及其出现次数。

数据集下载:wordcount数据集

提取码:k3v2


代码实现:

#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include <string>
#include <cstring>
#include <fstream>
#include <sstream>
#include <iterator>
#include <vector>
#include <map>
#include <unordered_map>
#include <dirent.h>
#include<algorithm>
#include <iostream>
#include <mpi.h>

using namespace std;

void getFiles(string path, vector<string>& filenames)
{
    DIR *pDir;
    struct dirent* ptr;
    if(!(pDir = opendir(path.c_str()))){
        return;
    }
    while((ptr = readdir(pDir))!=0) {
        if (strcmp(ptr->d_name, ".") != 0 && strcmp(ptr->d_name, "..") != 0){
            filenames.push_back(path + "/" + ptr->d_name);
    }
    }
    closedir(pDir);
}

std::string readFile(std::string filename) {
    std::ifstream in(filename);

    in.seekg(0, std::ios::end);
    size_t len = in.tellg();
    in.seekg(0);
    std::string contents(len + 1, '\0');
    in.read(&contents[0], len);
    return contents;
}

std::vector<std::string> split(std::string const &input) { 
    vector<string> ret;
    int i=0,j=0,n=input.length();
    string temp;
    while(j<n){
        if ((input[j] >= 'a' && input[j] <= 'z') || (input[j] >= 'A' && input[j] <= 'Z')) {
            j++;
        }
        else {
            if (i < j) {
                temp = input.substr(i, j - i);
                ret.emplace_back(temp);
            }
            j++;
            i = j;
       }
   }
   return ret;
}

std::vector<std::string> getWords(
    std::string &content, int rank, int worldsize) {
    std::vector<std::string> wordList = split(content);
    std::vector<std::string> re;
    std::string tmp;
    for (int i = 0 ; i < wordList.size(); i++) {
        if (i % worldsize == rank) {
            //re.push_back(wordList[i]);
            tmp += " " + wordList[i];
        }
    }
    re.push_back(tmp);
    return re;
}

std::vector<pair<std::string, int>> countWords(
    std::vector<std::string> &contentList) {
    // split words
    std::vector<std::string> wordList;
    std::string concat_content;
    for (auto it = contentList.begin(); it != contentList.end(); it++) {
        std::string content = (*it);
        concat_content += " " + content;
    }
    wordList = split(concat_content);

    // do the word count
    std::map<std::string, int> counts;
    for (auto it = wordList.begin(); it != wordList.end(); it++) {
        if (counts.find(*it) != counts.end()) {
            counts[*it] += 1;
        } else {
            counts[*it] = 1;
        }
    }
    std::vector<pair<std::string, int>> res;
    for (auto it = counts.begin(); it != counts.end(); it++) {
        res.push_back(std::make_pair(it->first, it->second));
    }
    return res;
}

std::vector<pair<std::string, int>> mergeCounts(
    std::vector<pair<std::string, int>> &countListA,
    std::vector<pair<std::string, int>> &countListB) {
    std::map<std::string, int> counts;
    for (auto it = countListA.begin(); it != countListA.end(); it++) {
        counts[it->first] = it->second;
    }
    for (auto it = countListB.begin(); it != countListB.end(); it++) {
        if (counts.find(it->first) == counts.end())
            counts[it->first] = it->second;
        else
            counts[it->first] += it->second;
    }
    std::vector<pair<std::string, int>> res;
    for (auto it = counts.begin(); it != counts.end(); it++) {
        res.push_back(std::make_pair(it->first, it->second));
    }
    return res;
}

void sendLocalCounts(int from, int to,
                     std::vector<pair<std::string, int>> &counts) {
    int num = counts.size();
    MPI_Send(&num, 1, MPI_INT, to, from, MPI_COMM_WORLD);

    if (num) {
        int *counts_array = new int[num];
        int i = 0;
        for (auto it = counts.begin(); it != counts.end(); it++, i++) {
            counts_array[i] = it->second;
        }
        MPI_Send(counts_array, num, MPI_INT, to, from, MPI_COMM_WORLD);
        delete counts_array;
    }

    std::string words = " ";
    for (auto it = counts.begin(); it != counts.end(); it++) {
        words += it->first;
        words += " ";
    }
    num = words.length();
    MPI_Send(&num, 1, MPI_INT, to, from, MPI_COMM_WORLD);
    if (num) {
        char *_words = new char[num];
        words.copy(_words, num);
        MPI_Send(_words, num, MPI_CHAR, to, from, MPI_COMM_WORLD);
        delete _words;
    }
}

void recvCounts(int from, int to, std::vector<pair<std::string, int>> &counts) {
    MPI_Status status;
    int _num = 0, num = 0;
    int *counts_array;
    char *_words;
    std::string words;
    MPI_Recv(&_num, 1, MPI_INT, from, from, MPI_COMM_WORLD, &status);
    if (_num) {
        counts_array = new int[_num];
        MPI_Recv(counts_array, _num, MPI_INT, from, from, MPI_COMM_WORLD, &status);
    }

    MPI_Recv(&num, 1, MPI_INT, from, from, MPI_COMM_WORLD, &status);
    if (num) {
        _words = new char[num];
        MPI_Recv(_words, num, MPI_CHAR, from, from, MPI_COMM_WORLD, &status);
        
        for (int _i = 0; _i < num;  _i++) words+=_words[_i];
        delete _words;
    }

    if (_num) {
        std::vector<std::string> word_vec = split(words);
        for (int i = 0; i < _num; i++) {
            counts.push_back(std::make_pair(word_vec[i], counts_array[i]));
        }
        delete counts_array;
    }
}

void treeMerge(int id, int worldSize,
               std::vector<pair<std::string, int>> &counts) {
    int participants = worldSize;
    while (participants > 1) {
        MPI_Barrier(MPI_COMM_WORLD);
        int _participants = participants / 2 + (participants % 2 ? 1 : 0);
        if (id < _participants) {
            if (id + _participants < participants) {
                std::vector<pair<std::string, int>> _counts;
                std::vector<pair<std::string, int>> temp;
                recvCounts(id + _participants, id, _counts);
                temp = mergeCounts(_counts, counts);
                counts = temp;
            }
        }
        if (id >= _participants && id < participants) {
            sendLocalCounts(id, id - _participants, counts);
        }
        participants = _participants;
    }
}

int main(int argc, char *argv[]) {
    int rank;
    int worldSize;
    MPI_Init(&argc, &argv);

    MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    /*
    * Word Count for big file
    */
{
    struct timeval start, stop;
    gettimeofday(&start, NULL);
    std::string big_file = "input.txt";
    auto content = readFile(big_file);
    transform(content.begin(),content.end(),content.begin(), ::tolower);
    auto partContent = getWords(content, rank, worldSize);

    auto counts = countWords(partContent);

    treeMerge(rank, worldSize, counts);
    gettimeofday(&stop, NULL);
    
    
    if (rank == 0) {
        fstream dataFile;
        std::string out_file = "./output.txt";
        dataFile.open("./output.txt", ios::out);
        int num = 0;
        for (auto it = counts.begin(); it != counts.end(); it++) {
            //num = it->second/worldSize;
            num = it->second;
            dataFile << it->first << " : " << num << endl;
            cout<< it->first << " : " << num << endl;

        }
    }
    if (rank == 0) {
        cout << "times: "
             << (stop.tv_sec - start.tv_sec) * 1000.0 +
                    (stop.tv_usec - start.tv_usec) / 1000.0
             << " ms"<< endl;
    }
   
}

    MPI_Finalize();
    return 0;
}

编译:mpicxx ./filename.cpp  -o ./filename

运行:mpirun -n 2 ./filename

运行结果:

Mpi:一个线程时:

四个线程时:

加速比:8668.63/3854.48=2.254163985803533

加速效果明显。

Mpi基本原理:

  1.什么是MPI

Massage Passing Interface:是消息传递函数库的标准规范,由MPI论坛开发。

一种新的库描述,不是一种语言。共有上百个函数调用接口,提供与C和Fortran语言的绑定

MPI是一种标准或规范的代表,而不是特指某一个对它的具体实现

MPI是一种消息传递编程模型,并成为这种编程模型的代表和事实上的标准

2.MPI的特点

MPI有以下的特点:

消息传递式并行程序设计

指用户必须通过显式地发送和接收消息来实现处理机间的数据交换。

在这种并行编程中,每个并行进程均有自己独立的地址空间,相互之间访问不能直接进行,必须通过显式的消息传递来实现。

这种编程方式是大规模并行处理机(MPP)和机群(Cluster)采用的主要编程方式。

并行计算粒度大,特别适合于大规模可扩展并行算法

用户决定问题分解策略、进程间的数据交换策略,在挖掘潜在并行性方面更主动,并行计算粒度大,特别适合于大规模可扩展并行算法

消息传递是当前并行计算领域的一个非常重要的并行程序设计方式

二、MPI的基本函数

MPI调用借口的总数虽然庞大,但根据实际编写MPI的经验,常用的MPI函数是以下6个:

MPI_Init(…);

MPI_Comm_size(…);

MPI_Comm_rank(…);

MPI_Send(…);

MPI_Recv(…);

MPI_Finalize();

三、MPI的通信机制

MPI是一种基于消息传递的编程模型,不同进程间通过消息交换数据。

1.MPI点对点通信类型

所谓点对点的通信就是一个进程跟另一个进程的通信,而下面的聚合通信就是一个进程和多个进程的通信。

  1. 标准模式:

该模式下MPI有可能先缓冲该消息,也可能直接发送,可理解为直接送信或通过邮局送信。是最常用的发送方式。

由MPI决定是否缓冲消息

没有足够的系统缓冲区时或出于性能的考虑,MPI可能进行直接拷贝:仅当相应的接收完成后,发送语句才能返回。

这里的系统缓冲区是指由MPI系统管理的缓冲区。而非进程管理的缓冲区。

MPI环境定义有三种缓冲区:应用缓冲区、系统缓冲区、用户向系统注册的通信用缓冲区

MPI缓冲消息:发送语句在相应的接收语句完成前返回。

这时后发送的结束或称发送的完成== 消息已从发送方发出,而不是滞留在发送方的系统缓冲区中。

该模式发送操作的成功与否依赖于接收操作,我们称之为非本地的,即发送操作的成功与否跟本地没关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/31779.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ChatGPT使用的SSE技术是什么?

在现代web应用程序中&#xff0c;实时通信变得越来越重要。HTTP协议的传统请求/响应模式总是需要定期进行轮询以获得最新的数据&#xff0c;这种方式效率低下并且浪费资源。因此&#xff0c;出现了一些新的通信技术&#xff0c;如WebSocket和SSE。但是&#xff0c;GPT为什么选择…

分布式数据库架构

分布式数据库架构 1、MySQL常见架构设计 对于mysql架构&#xff0c;一定会使用到读写分离&#xff0c;在此基础上有五种常见架构设计&#xff1a;一主一从或多从、主主复制、级联复制、主主与级联复制结合。 1.1、主从复制 这种架构设计是使用的最多的。在读写分离的基础上…

JS 介绍 Babel 的使用及 presets plugins 的概念

一、Babel 是什么 Bebal 可以帮助我们将新 JS 语法编译为可执行且兼容旧浏览器版本的一款编译工具。 举个例子&#xff0c;ES6&#xff08;编译前&#xff09;&#xff1a; const fn () > {};ES5&#xff08;编译后&#xff09;&#xff1a; var fn function() {}二、B…

设计模式-抽象工厂模式

抽象工厂模式 1、抽象工厂模式简介2、具体实现 1、抽象工厂模式简介 抽象工厂模式(Abstract Factory Pattern)在工厂模式尚添加了一个创建不同工厂的抽象接口(抽象类或接口实现)&#xff0c;该接口可叫做超级工厂。在使用过程中&#xff0c;我们首先通过抽象接口创建不同的工厂…

【HTML界面设计(二)】说说模块、登录界面

记录很早之前写的前端界面&#xff08;具体时间有点久远&#xff09; 一、说说模板 采用 适配器&#xff08;Adapter&#xff09;原理 来设计这款说说模板&#xff0c;首先看一下完整效果 这是demo样图&#xff0c;需要通过业务需求进行修改的部分 这一部分&#xff0c;就是dem…

Redis系列--布隆过滤器(Bloom Filter)

一、前言 在实际开发中&#xff0c;会遇到很多要判断一个元素是否在某个集合中的业务场景&#xff0c;类似于垃圾邮件的识别&#xff0c;恶意ip地址的访问&#xff0c;缓存穿透等情况。类似于缓存穿透这种情况&#xff0c;有许多的解决方法&#xff0c;如&#xff1a;redis存储…

宏景eHR SQL注入漏洞复现(CNVD-2023-08743)

0x01 产品简介 宏景eHR人力资源管理软件是一款人力资源管理与数字化应用相融合&#xff0c;满足动态化、协同化、流程化、战略化需求的软件。 0x02 漏洞概述 宏景eHR 存在SQL注入漏洞&#xff0c;未经过身份认证的远程攻击者可利用此漏洞执行任意SQL指令&#xff0c;从而窃取数…

如何在大规模服务中迁移缓存

当您启动初始服务时&#xff0c;通常会过度设计以考虑大量流量。但是&#xff0c;当您的服务达到爆炸式增长阶段&#xff0c;或者如果您的服务请求和处理大量流量时&#xff0c;您将需要重新考虑您的架构以适应它。糟糕的系统设计导致难以扩展或无法满足处理大量流量的需求&…

docker基础

文章目录 通过Vagrant安装虚拟机修改虚拟机网络配置 docker CE安装(在linux上)docker desktop安装(在MacOS上)Docker架构关于-阿里云镜像加速服务配置centos卸载docker 官网: http://www.docker.com 仓库: https://hub.docker.com Docker安装在虚拟机上&#xff0c;可以通过V…

Go语言的TCP和HTTP网络服务基础

目录 【TCP Socket 编程模型】 Socket读操作 【HTTP网络服务】 HTTP客户端 HTTP服务端 TCP/IP 网络模型实现了两种传输层协议&#xff1a;TCP 和 UDP&#xff0c;其中TCP 是面向连接的流协议&#xff0c;为通信的两端提供稳定可靠的数据传输服务&#xff1b;UDP 提供了一种…

[MySQL]不就是SQL语句

前言 本期主要的学习目标是SQl语句中的DDL和DML实现对数据库的操作和增删改功能&#xff0c;学习完本章节之后需要对SQL语句手到擒来。 1.SQL语句基本介绍 SQL&#xff08;Structured Query Language&#xff09;是一种用于管理关系型数据库的编程语言。它允许用户在数据库中存…

双因素身份验证在远程访问中的重要性

在快速发展的数字环境中&#xff0c;远程访问计算机和其他设备已成为企业运营的必要条件。无论是在家庭办公室运营的小型初创公司&#xff0c;还是团队分散在全球各地的跨国公司&#xff0c;远程访问解决方案都能保证工作效率和连接性&#xff0c;能够跨越距离和时间的阻碍。 …

7Z045 引脚功能详解

本文针对7Z045芯片&#xff0c;详细讲解硬件设计需要注意的技术点&#xff0c;可以作为设计和检查时候的参考文件。问了方便实用&#xff0c;按照Bank顺序排列&#xff0c;包含配置Bank、HR Bank、HP Bank、GTX Bank、供电引脚等。 参考文档包括&#xff1a; ds191-XC7Z030-X…

怎么计算 flex-shrink 的缩放尺寸

计算公式: 子元素的宽度 - (子元素的宽度的总和 - 父盒子的宽度) * (某个元素的flex-shrink / flex-shrink总和) 面试问题是这样的下面 left 和 right 的宽度分别是多少 * {padding: 0;margin: 0;}.container {width: 500px;height: 300px;display: flex;}.left {width: 500px…

红日靶场(一)外网到内网速通

红日靶场&#xff08;一&#xff09; 下载地址&#xff1a;http://vulnstack.qiyuanxuetang.net/vuln/detail/2/ win7:双网卡机器 win2003:域内机器 win2008域控 web阶段 访问目标机器 先进行一波信息收集&#xff0c;扫一下端口和目录 扫到phpmyadmin&#xff0c;还有一堆…

【资料分享】Xilinx Zynq-7010/7020工业核心板规格书(双核ARM Cortex-A9 + FPGA,主频766MHz)

1 核心板简介 创龙科技SOM-TLZ7x是一款基于Xilinx Zynq-7000系列XC7Z010/XC7Z020高性能低功耗处理器设计的异构多核SoC工业核心板&#xff0c;处理器集成PS端双核ARM Cortex-A9 PL端Artix-7架构28nm可编程逻辑资源&#xff0c;通过工业级B2B连接器引出千兆网口、USB、CAN、UA…

Triton教程 --- 动态批处理

Triton教程 — 动态批处理 Triton系列教程: 快速开始利用Triton部署你自己的模型Triton架构模型仓库存储代理模型设置优化动态批处理 Triton 提供了动态批处理功能&#xff0c;将多个请求组合在一起执行同一模型以提供更大的吞吐量。 默认情况下&#xff0c;只有当每个输入在…

【开源与项目实战:开源实战】81 | 开源实战三(上):借Google Guava学习发现和开发通用功能模块

上几节课&#xff0c;我们拿 Unix 这个超级大型开源软件的开发作为引子&#xff0c;从代码设计编写和研发管理两个角度&#xff0c;讲了如何应对大型复杂项目的开发。接下来&#xff0c;我们再讲一下 Google 开源的 Java 开发库 Google Guava。 Google Guava 是一个非常成功、…

io.netty学习(十一)Reactor 模型

目录 前言 传统服务的设计模型 NIO 分发模型 Reactor 模型 1、Reactor 处理请求的流程 2、Reactor 三种角色 单Reactor 单线程模型 1、消息处理流程 2、缺点 单Reactor 多线程模型 1、消息处理流程 2、缺点 主从Reactor 多线程模型 主从Reactor 多线程模型示例 1…

CTF-Show密码学:ZIP文件密码破解【暴力破解】

萌新 隐写23 题目内容&#xff1a; 文件的主人喜欢用生日做密码&#xff0c;而且还是个90后。 一、已知条件 在这个题目中&#xff0c;我们有以下已知条件&#xff1a; 文件的主人喜欢用生日做密码 - 这个条件告诉我们&#xff0c;密码可能是一个八位的纯数字密码&#xff0c…