系列文章目录
- 初探MPI——MPI简介
- 初探MPI——(阻塞)点对点通信
- 初探MPI——(非阻塞)点对点通信
- 初探MPI——集体通信
文章目录
- 系列文章目录
- 前言
- 一、Non-blocking communications
- 1.1 Block version
- 1.2 Non-blocking version
- 二、准备工作
- 2.1 Non-Blocking send and receive
- 2.1.1 `MPI_Isend`
- 2.1.2 `MPI_Irecv`
- 2.2 Waiting
- 2.3 Testing
- 2.4 `MPI_Barrier`
- 2.5 示例伪代码
- 三、代码实现
- 四、race condition(竞争条件)
- 五、Probing incoming communications
- 5.1 内存布局对MPI发送数据的影响
- 5.2 练习
- 总结
- 参考
前言
上一篇文章2. 初探MPI——(阻塞)点对点通信讲述了阻塞点对点通信。这意味着当一个进程发送或者接收信息时,它必须等待传输结束才能恢复到它正在做的事情。在某些应用中,这可能会受到极大的限制。这一章主要描述了非阻塞通信(在数值计算中,其实本人很少见到用非阻塞通信的,原因可能是因为进程将模拟区域划分成了多个网格,每个粒子或者每个区域的计算还是很依赖周围网格的数值,所以非阻塞通信用得还是少。当然也有可能是我才疏学浅,见识浅薄。)
个人见解:非阻塞点对点通信更像是多线程并行(除了没有统一内存),由此必然会产生临界区,竞争条件以及同步等问题。所以处理问题的逻辑和多线程相似,无外乎用的接口不一样罢了。
一、Non-blocking communications
蓝色是计算部分,黄色是发送或者接收部分
非阻塞通信始终需要初始化和完成。这意味着现在,我们将调用 send 和 receive 命令来初始化通信。然后,该过程将继续工作,而不是等待完成发送(或接收),并会不时检查一次以查看通信是否完成。这可能有点晦涩难懂,所以让我们一起举个例子。
想象一下,进程 0 必须首先工作 3 秒,然后工作 6 秒。同时,进程 1 必须工作 5 秒,然后工作 3 秒。它们必须在中间和结束时的某个时间同步。
1.1 Block version
从该示例中可以看出,阻塞通信将强制进程 0 等待进程 1。要进行第一次通信,两个程序都必须等待 5 秒钟。然后,要进行第二次通信,两者都必须再等待 6 秒。这意味着该程序总共至少需要 11 秒才能运行。由于实际发送/接收数据的开销,因此比这多一点。
if (rank == 0) {
sleep(3);
MPI_Send(buffer, buffer_count, MPI_INT, 1, 0, MPI_COMM_WORLD);
sleep(6);
MPI_Send(buffer, buffer_count, MPI_INT, 1, 1, MPI_COMM_WORLD);
}
else {
sleep(5);
MPI_Recv(buffer, buffer_count, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
sleep(3);
MPI_Recv(buffer, buffer_count, MPI_INT, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
//注意,使用不同的标签来表示两种不同的通信。
1.2 Non-blocking version
进程 0 可以继续工作(每毫秒进行探测),并且仅在进程 1 准备好,而不是进程 0 等待进程 1 进行第一次通信。
二、准备工作
为了书写阻塞版本和非阻塞版本的代码,我们需要了解一些知识点
2.1 Non-Blocking send and receive
2.1.1 MPI_Isend
函数定义:
int MPI_Isend(void *buffer,
int count,
MPI_Datatype datatype,
int dest,
int tag,
MPI_Communicator comm,
MPI_Request *request);
MPI_Request
这是一个复杂的对象,我们不会在这里详细说明。
See it that way :
MPI_Isend
is preparing a request. This request is going to be executed when both processes are ready to synchronise. This command only sets up the send, but actually does not transfer anything to the destination process, only prepares it.
Once this request has been prepared, it is necessary to complete it. There are two ways of completing a request : wait and test.
2.1.2 MPI_Irecv
函数定义:
int MPI_Irecv(void *buffer,
int count,
MPI_Datatype datatype,
int source,
int tag,
MPI_Communicator comm,
MPI_Request *request);
The request must then be completed by either waiting or testing.
2.2 Waiting
等待会强制进程进入“阻塞模式”。发送过程将仅等待请求完成。如果进程在 MPI_Isend
之后立即等待,则发送与调用 MPI_Send
相同。有两种等待 MPI_Wait
方式和 MPI_Waitany
.
int MPI_Wait(MPI_Request *request,
MPI_Status *status);
int MPI_Waitany(int count,
MPI_Request array_of_requests[],
int *index, MPI_Status *status);
前者 MPI_Wait
只是等待给定请求的完成。请求完成后,将在 status
中返回 的 MPI_Status
实例。
后者 MPI_Waitany
等待请求数组中第一个完成的请求继续。请求完成后,将 index 的值设置为存储已完成请求的 array_of_requests 索引。该调用还存储已完成请求的状态。
2.3 Testing
等待会阻塞该进程,直到请求(或多个请求)得到满足。test则会检查该请求是否可以被完成。如果可以,则自动完成请求并传输数据。
int MPI_Test(MPI_Request *request,
int *flag,
MPI_Status *status);
int MPI_Testany(int count,
MPI_Request array_of_requests[],
int *index,
int *flag,
MPI_Status *status);
前者 MPI_Test
参数 request 和 status 同wait。现在请记住,test是非阻塞的,因此在任何情况下,该进程都会在调用后继续执行。该变量 flag 用于告诉request是否在测试期间完成。如果 flag != 0 这意味着request已完成。
后者 MPI_Testany
同理。如果任何请求可满足,则该值设置为 flag 非零值。如果是这样, status 并且 index 还被赋予一个值。
2.4 MPI_Barrier
MPI_Barrier(MPI_COMM_WORLD);
此命令强制某个通信器(在本例 MPI_COMM_WORLD 中为)中的所有进程相互等待。因此,通信器中的进程将暂停,直到每个进程都达到一个障碍(不一定是同一个障碍)。然后,执行恢复。这是一种硬同步不同进程的方法。
2.5 示例伪代码
Two processes, waiting and testing on only one request
// 数据存储在一个名为 buffer 的变量中,定义为大小 buffer_count 为 int 的数组。
MPI_Request request;
MPI_Status status;
int request_complete = 0; // 一个标志,用来检查非阻塞操作是否完成(0表示未完成,非0表示完成)
// rank 0 发送, rank 1 接受
if (rank == 0) {
MPI_Isend(buffer, buffer_count, MPI_INT, 1, 0, MPI_COMM_WORLD, &request);
// Here we do some work while waiting for process 1 to be ready
while (has_work) {
do_work();
// 测试 request是否完成,如果没有完成继续测试
if (!request_complete)
MPI_Test(&request, &request_complete, &status);
}
// 此时rank0 已经完成工作, 如果此时request还没完成,那就等待
if (!request_complete)
MPI_Wait(&request, &status);
}else {
MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);
// 单纯等待消息
MPI_Wait(&request, &status);
}
三、代码实现
void play_non_blocking_scenario() {
MPI_Request request;
MPI_Status status;
int request_finished = 0;
// Initialising buffer :
for (int i=0; i < buffer_count; ++i)
buffer[i] = (rank == 0 ? i*2 : 0);
MPI_Barrier(MPI_COMM_WORLD);
// Starting the chronometer
double time = -MPI_Wtime(); // This command helps us measure time. We will see more about it later on !
// You should not modify anything BEFORE this point //
if (rank == 0) {
sleep(3);
// 1- Initialise the non-blocking send to process 1
MPI_Isend(buffer, buffer_count, MPI_INT, 1, 0, MPI_COMM_WORLD, &request);
double time_left = 6000.0; // 6000微妙=6秒
while (time_left > 0.0) {
usleep(1000); // 该函数单位为微秒,1000微秒为1ms
// 2- Test if the request is finished (only if not already finished)
if (!request_finished) {
MPI_Test(&request, &request_finished, &status);
}
// 1ms left to work
time_left -= 1.0;
}
// 3- If the request is not yet complete, wait here.
if (!request_finished) {
MPI_Wait(&request, &status);
}
// Modifying the buffer for second step
for (int i=0; i < buffer_count; ++i)
buffer[i] = -i;
// 4- Prepare another request for process 1 with a different tag
MPI_Isend(buffer, buffer_count, MPI_INT, 1, 1, MPI_COMM_WORLD, &request);
request_finished = 0; // Reset the flag
time_left = 3000.0;
while (time_left > 0.0) {
usleep(1000); // We work for 1ms
// 5- Test if the request is finished (only if not already finished)
if (!request_finished) {
MPI_Test(&request, &request_finished, &status);
}
// 1ms left to work
time_left -= 1.0;
}
// 6- Wait for it to finish
if (!request_finished) {
MPI_Wait(&request, &status);
}
}
else {
// Work for 5 seconds
sleep(5);
// 7- Initialise the non-blocking receive from process 0
MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);
// 8- Wait here for the request to be completed
MPI_Wait(&request, &status);
print_buffer();
// Work for 3 seconds
sleep(3);
// 9- Initialise another non-blocking receive
MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 1, MPI_COMM_WORLD, &request);
// 10- Wait for it to be completed
MPI_Wait(&request, &status);
print_buffer();
}
// should not modify anything AFTER this point //
// Stopping the chronometer
time += MPI_Wtime();
// This line gives us the maximum time elapsed on each process.
// We will see about reduction later on !
double final_time;
MPI_Reduce(&time, &final_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
if (rank == 0)
std::cout << "Total time for non-blocking scenario : " << final_time << "s" << std::endl;
}
运行结果:
详细解释一下下面这段代码:
double time_left = 6000.0;
while (time_left > 0.0) {
usleep(1000); // We work for 1ms
// 2- Test if the request is finished (only if not already finished)
if (!request_finished) {
MPI_Test(&request, &request_finished, &status);
}
// 1ms left to work
time_left -= 1.0;
}
// 3- If the request is not yet complete, wait here.
if (!request_finished) {
MPI_Wait(&request, &status);
}
MPI_Wait
函数被用来阻塞进程直到非阻塞请求(如发送或接收操作)完成。在此期间,程序的执行将会暂停,直到与 request
关联的操作完全完成。这意味着,一旦进入 MPI_Wait
,不需要进一步的循环或额外的检查,因为 MPI_Wait
保证在请求完成之前不会返回。
几个关键点:
-
MPI_Test
的使用:- 在
while
循环中,MPI_Test
被调用来非阻塞地检查request
是否已经完成。如果request
完成了,MPI_Test
会将request_finished
设置为非零值,并更新status
。 - 如果在循环结束前
request
完成,MPI_Test
会确保将request_finished
设置为真(非零),从而避免进入MPI_Wait
。
- 在
-
进入
MPI_Wait
:- 如果循环结束后
request_finished
仍然为零(意味着请求尚未完成),则执行MPI_Wait
。这里的MPI_Wait
是阻塞调用,它会停止进程的执行,直到关联的请求完全完成。 - 在
MPI_Wait
调用期间,你不需要手动更新request_finished
,因为这个调用会一直阻塞,直到请求完成。一旦请求完成,MPI_Wait
返回,程序可以继续执行。
- 如果循环结束后
-
MPI_Wait
后的执行:MPI_Wait
完成后,程序会继续执行MPI_Wait
后的代码。在此情况下,request_finished
变量实际上不再需要,因为MPI_Wait
的行为已经保证了请求的完成。
在多数情况下,使用 MPI_Test
和 MPI_Wait
的组合是为了允许在等待长时间操作(如网络通信)完成时,程序能够执行一些其他的计算或处理任务,从而提高程序的整体效率和响应性。当进入 MPI_Wait
,程序假定已经尽可能地延迟了等待,此时只需确保操作完成即可继续。
四、race condition(竞争条件)
先来看看这段代码
#include <iostream>
#include <unistd.h>
#include <mpi.h>
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
MPI_Request request;
MPI_Status status;
int request_complete = 0;
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
constexpr int buffer_count = 10;
int buffer[buffer_count];
// Rank 0 sends, rank 1 receives
if (rank == 0) {
// Filling the buffer
std::cout << "Process 0 is sending : ";
for (int i=0; i < buffer_count; ++i) {
buffer[i] = i*i;
std::cout << buffer[i] << " ";
}
std::cout << std::endl;
// Sending the data and waiting for 5 seconds
MPI_Isend(buffer, buffer_count, MPI_INT, 1, 0, MPI_COMM_WORLD, &request);
sleep(5);
}
else {
// Resetting the buffer
for (int i=0; i < buffer_count; ++i)
buffer[i] = 0;
// Receiving and sleeping for 5 seconds
MPI_Irecv(buffer, buffer_count, MPI_INT, 0, 0, MPI_COMM_WORLD, &request);
sleep(5);
// Printing the buffer received by process 1
int ite = 0;
std::cout << "Process 1 received : ";
for (int i=0; i < buffer_count; ++i)
std::cout << buffer[i] << " ";
std::cout << std::endl;
}
MPI_Finalize();
}
输出结果是
Process 0 is sending : 0 1 4 9 16 25 36 49 64 81
Process 1 received : 0 0 0 0 0 0 0 0 0 0
这与预期不符合啊。怎么回事?
竞争条件和之前的线程并行是一样的,这的是两个进程尝试同时访问某些数据
如图,如果有两个进程 P0 和 P1 . P0 发送一条非阻塞消息, P1 然后重新开始在它刚刚发送的同一缓冲区上工作。 P1 只等待接收。如果请求在实际收到数据之前 P1 没有被处理,那么可能已经 P0 修改了其缓冲区,并且接收 P1 的数据可能是无效的。顾名思义,竞争条件指的是:两个处理都在争用数据。如果 P0 速度更快,则接收的数据 P1 可能不是您想要发送的数据。另一方面,如果 P1 速度更快,则接收到的数据将是正确的。但是,由于这两个过程是并行运行的,因此绝对无法保证结果。这在每个并行程序中都是一个非常严重的问题。在 MPI 中,避免了大多数竞争条件问题(与共享内存并行性相比,这些问题无处不在),但非阻塞通信是例外。
必须了解 MPI 中非阻塞通信背后的语义。非阻塞通信是两个进程正在达成的协议,以避免在满足请求之前读取或写入通信缓冲区。确保请求已完成的唯一方法是使用
MPI_Wait
或MPI_Test
。只有在此之后,进程才能使用通信缓冲区。因此,使用非阻塞发送或接收的程序在读取或写入通信缓冲区之前始终必须最终调用MPI_Wait
。
注意:
MPI_Isend
和MPI_Irecv
后面必须有时跟MPI_Test
和MPI_Wait
。在请求完成之前,发送进程不应写入缓冲区。另一方面,在请求完成之前,接收进程不应该读取缓冲区。知道请求是否完成的唯一方法是调用MPI_Wait
和MPI_Test
。
五、Probing incoming communications
在上一篇文章2. 初探MPI——(阻塞)点对点通信讲了MPI_Probe
函数在预估和动态消息传输的应用,今天再次重温该函数,并且进行稍微的拓展。
在之前的示例中,只有少量的进程和少量的通信。然而,在现实应用中,通信的数据量可能非常大,在这种情况下,优化发送的消息的大小会对系统的性能产生真正的影响。
作为一般经验法则,在 MPI 通信时应始终遵守两点:
- Try to group as many data as possible in one communication. Sending N communications of M bytes will always be more costly than one communication of N*M bytes.
- Try to send the exact amount of data you are storing in your buffer and no more.
第二点的解决方式就是:Probing the message. Basically asking MPI to give you the size of the message.
如果您想探测任何类型的消息或来自任何来源的消息的接收,可以使用 MPI_ANY_SOURCE
和 MPI_ANY_TAG
。然后,可以将生成的 MPI_Status
对象与其他函数结合使用来获取更多信息。
举一个简单的例子:
#include <iostream>
#include <cstdlib>
#include <mpi.h>
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int size, rank;
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
// Process 0 is sending a random number (between 10 and 25) of integers to process 1
int n_items = rand() % 16 + 10; // BAD way of doing random.
std::cout << "Process 0, random count gives us " << n_items << " ints to send." << std::endl;
// Allocation and initialisation of the buffer
int *send_buf = new int[n_items];
for (int i=0; i < n_items; ++i)
send_buf[i] = i*i;
std::cout << "Process 0, sending : ";
for (int i=0; i < n_items; ++i)
std::cout << send_buf[i] << " ";
std::cout << std::endl;
// Blocking send
MPI_Send(send_buf, n_items, MPI_INT, 1, 0, MPI_COMM_WORLD);
// Deallocation
delete [] send_buf;
}
else {
// Probing the reception of messages
MPI_Status status;
MPI_Probe(0, 0, MPI_COMM_WORLD, &status);
// From the probed status we get the number of elements to receive
int n_items;
MPI_Get_count(&status, MPI_INT, &n_items);
std::cout << "Process 1, probing tells us message will have " << n_items << " ints." << std::endl;
// Allocating and receiving
int *recv_buf = new int[n_items];
MPI_Recv(recv_buf, n_items, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Process 1, buffer received : ";
for (int i=0; i < n_items; ++i)
std::cout << recv_buf[i] << " ";
std::cout << std::endl;
delete [] recv_buf;
}
MPI_Finalize();
return 0;
}
5.1 内存布局对MPI发送数据的影响
讨论如何以及为什么内存布局会影响到向MPI发送数据时的正确性。让我们解释一下。
首先,看第一段代码:
float vectors[100][3]; // A 100 vector table
int n_to_send = 10;
fill_in_vectors(vectors, n_to_send);
MPI_Send(vectors, n_to_send*3, MPI_FLOAT, 1, 0, MPI_COMM_WORLD);
这段代码中,vectors
是一个100行3列的数组,表示100个3D向量。fill_in_vectors
函数用来填充这些向量。然后,MPI_Send
函数被调用,将vectors
数组中的数据发送给进程1。MPI_Send
的第二个参数是要发送的数据的大小,这里是n_to_send*3
,因为每个向量有3个浮点数,所以乘以3。
接着,看第二段代码:
float vectors[3][100]; // Same but inverted layout
int n_to_send = 10;
fill_in_vectors(vectors, n_to_send);
MPI_Send(vectors, n_to_send*3, MPI_FLOAT, 1, 0, MPI_COMM_WORLD);
这段代码和第一段类似,但是vectors
的定义发生了变化,现在是一个3行100列的数组。这意味着数据在内存中的布局是不同的。虽然它包含相同的数据,但是由于内存布局的不同,它表示的含义也不同。
在两种情况下,MPI_Send
函数的参数都是相同的,都是将vectors
数组中的数据发送给进程1。但是,由于内存布局的不同,实际上发送的数据是不同的。
在第一种情况下,vectors
数组中的每一行代表一个3D向量,因此发送的是10个3D向量的数据。
而在第二种情况下,vectors
数组中的每一列代表一个3D向量,因此发送的是3个3D向量的数据,因为数组的行数为3。
因此,这段代码在探讨了内存布局对于数据传输的影响,以及在处理3D向量时,选择合适的内存布局是非常重要的。
如果用fortran写,则结果会相反
5.2 练习
要求:
You must fill in the function so that the process receives a message from any process. Then writes to stdout Received a message from process ## with tag ##
. Then, if tag is 0, receive the message as a single integer and add it to int_sum
. Else, if the tag is 1, receive the message as a single float and add it to the float_sum
variable.
代码:
void probing_process(int &int_sum, float &float_sum) {
MPI_Status status;
int count;
// 1- Probe the incoming message
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// 2- Get the tag and the source
int tag = status.MPI_TAG;
int source = status.MPI_SOURCE;
// Printing the message
std::cout << "Received a message from process " << source << " with tag " << tag << std::endl;
// 3- Add to int_sum or float_sum depending on the tag of the message
MPI_Get_count(&status, MPI_FLOAT, &count);
if(tag==0){
int received_int;
MPI_Recv(&received_int, count, MPI_INT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
int_sum += received_int;
}else {
// Message is a float
float received_float;
MPI_Recv(&received_float, count, MPI_FLOAT, source, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
float_sum += received_float;
}
}
运行结果:
总结
- 非阻塞点对点通信需要初始化request
- 在
MPI_Test
会有flag去判断消息是否已经被接受。 MPI_Wait
进程停止等待- 竞争条件在非阻塞点对点通信非常常见,我们需要将改写缓存区的操作设置成为临界区。
- 在发送消息的时候要灵活应用
MPI_Probe
实现预估和动态传输消息中一些具体的其他信息。
参考
- MPI Tutorials