MPI Tutorial
1. Fundamental MPI concepts
MPI is short for Message Passing Interface. It has been designed by the HPC community to write parallel codes that can be executed on distributed memory systems (e.g., a cluster with many nodes). It has been widely adopted to complement OpenMP which is used for parallelize shared memory based single multi- core machine. MPI enables the message passing model of parallel programming. There are couple of core concepts behind MPI.
(1) The notion of a communicator. A communicator defines a group of processes that have the ability to communicate with one another. In this group of processes, each is assigned a unique rank, and they explicitly communicate with one another by their ranks.
(2) The basic communication model. The core of it is built upon the send and receive operations among processes. A process may send a message to another process by providing the rank of the process and a unique tag to identify the message. The receiver can then post a receive for a message with a given tag (or it may not even care about the tag), and then handle the data accordingly. Communications such as this which involve one sender and receiver are known as point-to-point communications.
(3) The collective communication. There are many cases where processes may need to communicate with everyone else. For example, when a manager process needs to broadcast information to all of its worker processes. In this case, it would be cumbersome to write code that does all of the sends and receives. In fact, it would often not use the network in an optimal manner. MPI can handle a wide variety of these types of collective communications that involve all processes.
So in this tutorial, we break up the materials into the following sections:
(A) MPI setup, compile and run: MPI_hello_world.c (B) Point to point communication
(C) Collective Communication
The code examples used in this tutorial can be
https://github.com/mpitutorial/mpitutorial/tree/gh-pages/tutorials Full MPI APIs and tutorial: https://computing.llnl.gov/tutorials/mpi/
found at:
This material is also developed based on the https://computing.llnl.gov/tutorials/mpi/ and www.mpitutorial.com.
MPI setup, compile and run: MPI_hello_world.c
You have two options for running MPI in this tutorial; installing running it on your own machine, or using an amazon autoscale cluster. In this section we will detail how to access the cluster.
Connecting to the Cluster:
Firstly you will need to access the USYD vpn, for the majority of students the following instructions will work, however if your particular location prohibits normal VPN use then you should find alternative instructions on the university website, or run your own installation.
Using openconnect on a Linux distribution the command to start a VPN connection is:
sudo openconnect -u
With the vpn running in one terminal, we can now connect to the cluster using ssh. You can find the ssh private key on ed. This should be saved with local user read only permissions and no permissions for other users in your .ssh directory with an appropriate name (here we’ve used student_cluster). To connect to the cluster we then:
ssh -i
Once on the server we can compile and run our MPI programs. For today we will be using slurm to automatically spin up or kill nodes.
Code should be compiled for mpi use using: mpicc and run using mpirun with the -n flag specifying the number of nodes
Code can then be run through a slurm session by allocating one using salloc. The full build and run chain is then:
mpicc mpi_test.c -o mpi_test salloc -n2 mpirun -n2 mpi_test
Given you will be sharing the machine with multiple other students, you can check your job’s current status using the squeue command, or cancel your job using scancel.
There are multiple different MPI implementations that may be managed and run by slurm. You can see which are installed using srun –mpi=list.
Please keep in mind that we only have 5 hyper-threaded nodes, values of over 2 will wait for additional nodes to spin up, values of over 10 will not complete.
Here’s a small program for you to compile and test using MPI.
This is a simple MPI program. Please try to understand all these common MPI APIs.
Point-to-point Communication
1. Blocked MPI Send and Receive
MPI’s send and receive calls operate in the following manner. First, process A decides a message needs to be sent to process B. Process A then packs up all of its necessary data into a buffer for process B. These buffers are often referred to as envelopes since the data is being packed into a single message before transmission (similar to how letters are packed into envelopes before transmission to the post office). After the data is packed into a buffer, the communication device (which is often a network) is responsible for routing the message to the proper location. The location of the message is defined by the process’s rank. Even though the message is routed to B, process B still has to acknowledge that it wants to receive A’s data. Once it does this, the data has been transmitted. Process A is acknowledged that the data has been transmitted and may go back to work.
Blocking MPI send / recv program:
MPI_Comm_rank and MPI_Comm_size are first used to determine the world size along with the rank of the process. Then process zero initializes a number to the value of negative one and sends this value to process one. As you can see in the else if statement, process one is calling MPI_Recv to receive the number. It also prints off the received value. Since we are sending and receiving exactly one integer, each process requests that one MPI_INT be sent/received. Each process also uses a tag number of zero to identify the message. The processes could have also used the predefined constant MPI_ANY_TAG for the tag number since only one type of message was being transmitted.
The other elementary MPI datatypes are listed below with their equivalent C datatypes.
MPI: Blocking vs. Unblocking
Blocking communication is done using MPI_Send() and MPI_Recv(), e.g., like the example we have shown above. These functions do not return (i.e., they block) until the communication is finished. Simplifying somewhat, this means that the buffer passed to MPI_Send() can be reused, either because MPI saved it somewhere, or because it has been received by the destination. Similarly, MPI_Recv() returns when the receive buffer has been filled with valid data.
In contrast, non-blocking communication is done using MPI_Isend() and MPI_Irecv(). These functions return immediately (i.e., they do not block) even if the communication is not finished yet. You must call MPI_Wait() or MPI_Test() to see whether the communication has finished.
Blocking communication is used when it is sufficient, since it is somewhat easier to use. Non-blocking communication is used when necessary, for example, you may call MPI_Isend(), do some computations, then do MPI_Wait(). This allows computations and communication to overlap, which generally leads to improved performance.
Using the following blocking communication between two processes as an example:
if(rank==0)
{
MPI_Send(x to process 1)
MPI_Recv(y from process 1)
}
if(rank==1)
{
MPI_Send(y to process 0);
MPI_Recv(x from process 0);
}
What happens in this case?
1. Process 0 sends x to process 1 and blocks until process 1 receives x.
2. Process 1 sends y to process 0 and blocks until process 0 receives y,
but
3. process 0 is blocked such that process 1 blocks for infinity until the
two processes are killed. This will cause deadlock.
For example, a MPI sender looks like this:
*buf = 3;
MPI_Isend(buf, 1, MPI_INT, …) *buf = 4;
*buf = 3;
MPI_Isend(buf, 1, MPI_INT, …)
MPI_Wait(…);
*buf = 4;
MPI_Wait(…);
—What will the receiver receives? 3?
—Answer: 3, 4 or anything else. Isend() is non-blocking there is a race condition for which value will send. How to change the code so that the receiver will receive 3?
Blocked MPI Send and Recv example: Ring Program
The ring program initializes a value from process zero, and the value is passed around every single process. The program terminates when process zero receives the value from the last process. As you can see from the program, extra care is taken to assure that it doesn’t deadlock. In other words, process zero makes sure that it has completed its first send before it tries to receive the value from the last process. All
of the other processes simply call MPI_Recv (receiving from their neighboring lower process) and then MPI_Send (sending the value to their neighboring higher process) to pass the value along the ring. MPI_Send and MPI_Recv will block until the message has been transmitted. Because of this, the printfs should occur by the order in which the value is passed.
2. Dynamic Receiving with and MPI Status and MPI Probe
An example of querying the MPI_Status structure
Assuming there are two MPI processes, the program sends a random amount of numbers to a receiver, and the receiver then finds out how many numbers were sent. The main part of the code looks like this.
As we can see, process zero randomly sends up to MAX_NUMBERS integers to process one. Process one then calls MPI_Recv for a total of MAX_NUMBERS integers. Although process one is passing MAX_NUMBERS as the argument to MPI_Recv, process one will receive at most this amount of numbers. In the code, process one calls MPI_Get_count with MPI_INT as the datatype to find out how many integers were actually received. Along with printing off the size of the received message, process one also prints off the source and tag of the message by accessing the MPI_SOURCE and MPI_TAG elements of the status structure. As a clarification, the return value from MPI_Get_count is relative to the datatype which is passed. If the user were to use MPI_CHAR as the datatype, the returned amount would be four times as large (assuming an integer is four bytes and a char is one byte).
An example of using MPI_Probe to find out the message size
Now that you understand how the MPI_Status object works, we can now use it to our advantage a little bit more. Instead of posting a receive and simply providing a really large buffer to handle all possible sizes of messages (as we did in the last example), you can use MPI_Probe to query the message size before actually receiving it.
MPI_Probe looks quite similar to MPI_Recv. In fact, you can think of MPI_Probe as an MPI_Recv that does everything but receive the message. Similar to MPI_Recv, MPI_Probe will block for a message with a matching tag and sender. When the message is available, it will fill the status structure with information. The user can then use MPI_Recv to receive the actual message.
Similar to the last example, process zero picks a random amount of numbers to send to process one. What is different in this example is that process one now calls MPI_Probe to find out how many elements process zero is trying to send (using MPI_Get_count). Process one then allocates a buffer of the proper size and receives the numbers. Running the code will look similar to this.
MPI Collective Communication
Broadcasting and Synchronization
Collective communication is a method of communication which involves participation of all processes in a communicator. One of the things to remember about collective communication is that it implies a synchronization point among processes. This means that all processes must reach a point in their code before they can all begin executing again.
Before going into detail about collective communication routines, let’s examine synchronization in more detail. As it turns out, MPI has a special function that is dedicated to synchronizing processes:
MPI_Barrier (MPI_Comm communicator)
The name of the function is quite descriptive – the function forms a barrier, and no processes in the communicator can pass the barrier until all of them call the function.
Broadcasting with MPI_Bcast: A broadcast is one of the standard collective communication techniques. During a broadcast, one process sends the same data to all processes in a communicator. One of the main uses of broadcasting is to send out user input to a parallel program, or send out configuration parameters to all processes.
In this example, process zero is the root process, and it has the initial copy of data. All of the other processes receive the copy of data.
In MPI, broadcasting can be accomplished by using MPI_Bcast. The function prototype looks like this:
MPI_Bcast(
void* data,
int count,
MPI_Datatype datatype,
int root,
MPI_Comm communicator)
Although the root process and receiver processes do different jobs, they all call the same MPI_Bcast function. When the root process (in our example, it was process zero) calls MPI_Bcast, the data variable will be sent to all other processes. When all of the receiver processes call MPI_Bcast, the data variable will be filled in with the data from the root process.
Two important features about MPI_Bcast:
(1) MPI_Bcast uses a smarter implementation is a tree-based communication algorithm that can use more of the available network links at once. Imagine that each process has only one outgoing/incoming network link: only using one network link from process zero to send all the data.
For example: assume I have 8 processes. I will have to send 7 data sequentially to the others during broadcasting. This is not efficient at all.
MPI_Bcast uses a smart way: In this illustration, process zero starts off with the data and sends it to process one. Process zero also sends the data to process two in the second stage. The difference with this example is that process one is now helping out the root process by forwarding the data to process three. During the second stage, two network connections are being utilized at a time. The network utilization doubles at every subsequent stage of the tree communication until all processes have received the data.
For example:
(2) Be careful: It is a different way of writing than blocking MPI send and recv !
For example, this code will hang:
It is written in a typical fashion of MPI send and recv. This is a common source of confusion for people new to MPI. You don’t use MPI_Recv() to receive data sent by a broadcast; you use MPI_Bcast().
For MPI collective communications, everyone has to particpate; everyone has to call the Bcast (That’s why the Bcast routine has a parameter that specifies the “root”, or who is doing the sending; if only the sender called bcast, you wouldn’t need this.) Everyone calls the broadcast, including the receivers; the receviers don’t just post a receive.
The reason for this is that the collective operations can involve everyone in the communication, so that you state what you want to happen (everyone gets one processes’ data) rather than how it happens (eg, root processor loops over all other ranks and does a send), so that there is scope for optimizing the communication patterns (e.g., a tree-based hierarchical communication that takes log(P) steps rather than P steps for P processes).
MPI Scatter, Gather, and Allgather
MPI_Scatter
MPI_Scatter is a collective routine that is very similar to MPI_Bcast. MPI_Scatter involves a designated root process sending data to all processes in a communicator. The primary difference between MPI_Bcast and MPI_Scatter is small but important. MPI_Bcast sends the same piece of data to all processes while MPI_Scatter sends chunks of an array to different processes. Check out the illustration below for further clarification.
In the illustration, MPI_Bcast takes a single data element at the root process (the red box) and copies it to all other processes. MPI_Scatter takes an array of elements and distributes the elements in the order of process rank. The first element (in red) goes to process zero, the second element (in green) goes to process one, and so on. Although the root process (process zero) contains the entire array of
data, MPI_Scatter will copy the appropriate element into the receiving buffer of the process. Here is what the function prototype of MPI_Scatter looks like.
MPI_Scatter(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
int root,
MPI_Comm communicator)
MPI_Gather
MPI_Gather is the inverse of MPI_Scatter. Instead of spreading elements from one process to many processes, MPI_Gather takes elements from many processes and gathers them to one single process. This routine is highly useful to many parallel algorithms, such as parallel sorting and searching. Below is a simple illustration of this algorithm.
Similar to MPI_Scatter, MPI_Gather takes elements from each process and gathers them to the root process. The elements are ordered by the rank of the process from which they were received. The function prototype for MPI_Gather is identical to that of MPI_Scatter.
MPI_Gather(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
int root,
MPI_Comm communicator)
In MPI_Gather, only the root process needs to have a valid receive buffer. All other calling processes can pass NULL for recv_data. Also, don’t forget that the recv_count parameter is the count of elements
received per process, not the total summation of counts from all processes. This can often confuse beginning MPI programmers.
Example: Computing average of numbers with MPI_Scatter and MPI_Gather
At the beginning of the code, the root process creates an array of random numbers. When MPI_Scatter is called, each process now containselements_per_procelements of the original data. Each process computes the average of their subset of data and then the root process gathers each individual average. The total average is computed on this much smaller array of numbers.
MPI_Allgather
So far, we have covered two MPI routines that perform many-to-one or one-to-many communication patterns, which simply means that many processes send/receive to one process. Oftentimes it is useful to be able to send many elements to many processes (i.e., a many-to-many communication pattern). MPI_Allgather has this characteristic.
Given a set of elements distributed across all processes, MPI_Allgather will gather all of the elements to all the processes. In the most basic sense, MPI_Allgather is an MPI_Gather followed by an MPI_Bcast. The illustration below shows how data is distributed after a call to MPI_Allgather.
Just like MPI_Gather, the elements from each process are gathered in order of their rank, except this time the elements are gathered to all processes. Pretty easy, right? The function declaration for MPI_Allgather is almost identical to MPI_Gather with the difference that there is no root process in MPI_Allgather.
MPI_Allgather(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
MPI_Comm communicator)
So now the computing average number program above can be written as:
The partial averages are now gathered to everyone using MPI_Allgather. The averages are now printed off from all of the processes.
Homework 10: OpenMP Scheduling
A sequential program for matrix multiplication is given here:
#include
int main(int argc, char *argv) {
omp_set_num_threads(8); //set number of threads here for (int i = 0; i < N; i++)
{
for (int j = 0; j < N; j++) {
A[i][j] = j * 1; B[i][j] = i * j + 2; C[i][j] = 0;
} }
double start = omp_get_wtime(); //start time measurement, used for timing matrix multiplication;
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
double sum = 0;
for (int k = 0; k < N; k++) {
sum += A[i][k] * B[k][j]; }
C[i][j] = sum; }
}
double end = omp_get_wtime(); //end time measurement printf("Time of computation: %f seconds\n", end - start); return 0;
}
The size of the N x N matrices in the program is set to 100 x 100. Change this to N = 2x10^3. Please provide a comprehensive report to answer the following questions. You should plot figures to show your testing results in the report.
Q1 (0.4 Points): You need to parallelize this matrix multiplication program in two different ways:
1. Add the necessary pragma to parallelize the outer for loop in the matrix multiplication.
2. Remove the pragma for the outer for loop and add the necessary pragma to parallelize the middle for loop in the matrix multiplication.
First point out the number of the available cores on your system (e.g., a screenshot from your testing). For both cases above, please collect the timing data with 1, 2, 4 and 8 threads, respectively. You will find that when you run the same program several times, the timing values can vary significantly. Therefore, please add a loop in the code to execute the program 5 times and display the average time.
Q2 (0.2 Points): Please try the three typical scheduling policies on OpenMP on the two parallel versions of code in Q1: default, static and dynamic scheduling on 2, 4 and 8 threads. Please pick the best performance point across all the testing cases. Please discuss why you observe such performance results.
Q3 (0.2 Points): Is possible to use OpenMP to parallelize the most-inner loop? If so, please write the code snippet in your report and discuss its potential performance issues. If not, please explain why.
Q4 (0.2 Points): Please first improve the sequential matrix multiplication kernel code via both register reuse and cache blocking (tiling) from Lecture_04. Then, parallelize this code to achieve the best execution time using OpenMP. Plot the collected execution time results of the new code on 1,2,4 and 8 threads, respectively. For this question, please also provide the OpenMP parallelized code for review.