Distributed Computing (MPI)
Overview
Teaching: 45 min
Exercises: 15 minQuestions
What is Message Passing Interface?
Objectives
Learn how to use MPI for distribute computation even crossing nodes.
Introduction
MPI is a specification for the developers and users of message passing libraries. By itself, it is NOT a library - but rather the specification of what such a library should be. There are several implementations of MPI, the most popular today are OpenMPI and Intel MPI and MPICH and MVAPICH.
Compiling MPI programs
The different implementations of MPI offer wrappers to the compilers. The following table shows how to compile applications with several implementations.
Using OpenMPI the compile line is as follows
Language | Wrapper | Command Example |
---|---|---|
C | mpicc | mpicc [options] main.c |
C++ | mpicxx | mpicxx [options] main.cpp |
Fortran | mpif90 | mpif90 [options] main.f90 |
Using Intel MPI you can use the commands above if using GCC as underlying compiler, or using the following commands for the intel compilers.
Language | Wrapper | Command Example |
---|---|---|
C | mpiicc | mpiicc [options] main.c |
C++ | mpiicpc | mpiicpc [options] main.cpp |
Fortran | mpiifort | mpiifort [options] main.f90 |
Getting started
Lets start with one simple example showing the basic elements of the interface. The program do actually nothing useful but illustrate the basic functions that are common to basically any program that uses MPI. The version in C is:
// required MPI include file
#include "mpi.h"
#include <stdio.h>
int main(int argc, char *argv[]) {
int numtasks, rank, len, rc;
char hostname[MPI_MAX_PROCESSOR_NAME];
// initialize MPI
MPI_Init(&argc,&argv);
// get number of tasks
MPI_Comm_size(MPI_COMM_WORLD,&numtasks);
// get my rank
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
// this one is obvious
MPI_Get_processor_name(hostname, &len);
printf ("Number of tasks= %d My rank= %d Running on %s\n", numtasks,rank,hostname);
// do some work with message passing
// done with MPI
MPI_Finalize();
}
The same program in Fortran 90 follows:
program mpi_env
! required MPI include file
include 'mpif.h'
integer numtasks, rank, len, ierr
character(MPI_MAX_PROCESSOR_NAME) hostname
! initialize MPI
call MPI_INIT(ierr)
! get number of tasks
call MPI_COMM_SIZE(MPI_COMM_WORLD, numtasks, ierr)
! get my rank
call MPI_COMM_RANK(MPI_COMM_WORLD, rank, ierr)
! this one is obvious
call MPI_GET_PROCESSOR_NAME(hostname, len, ierr)
print *, 'Number of tasks=',numtasks,' My rank=',rank,' Running on=',hostname
! do some work with message passing
! done with MPI
call MPI_FINALIZE(ierr)
end program mpi_env
For the purpose of this tutorial, we will use OpenMPI, the compilation line is as follows, for the C version:
mpicc mpi_env.c -lmpi
For the Fortran version:
mpif90 mpi_env.f90 -lmpi
Lets review the different calls. The basic difference between the functions C and Fortran subroutines is the integer returned. In C the return is from the function. In Fortran the equivalent subroutine usually contains and extra output only argument to store the value.
MPI_Init
Initializes the MPI execution environment. This function must be called in every MPI program, must be called before any other MPI functions and must be called only once in an MPI program. For C programs, MPI_Init may be used to pass the command line arguments to all processes, although this is not required by the standard and is implementation dependent.
MPI_Init (&argc, &argv)
MPI_INIT (ierr)
MPI_Comm_size
Returns the total number of MPI processes in the specified communicator, such as MPI_COMM_WORLD. If the communicator is MPI_COMM_WORLD, then it represents the number of MPI tasks available to your application.
MPI_Comm_size (comm, &size)
MPI_COMM_SIZE (comm, size, ierr)
MPI_Comm_rank
Returns the rank of the calling MPI process within the specified communicator. Initially, each process will be assigned a unique integer rank between 0 and number of tasks - 1 within the communicator MPI_COMM_WORLD. This rank is often referred to as a task ID. If a process becomes associated with other communicators, it will have a unique rank within each of these as well.
MPI_Comm_rank (comm,&rank)
MPI_COMM_RANK (comm,rank,ierr)
MPI_Get_processor_name
Returns the processor name. Also returns the length of the name. The buffer for “name” must be at least MPI_MAX_PROCESSOR_NAME characters in size. What is returned into “name” is implementation dependent - may not be the same as the output of the “hostname” or “host” shell commands.
MPI_Get_processor_name (&name,&resultlength)
MPI_GET_PROCESSOR_NAME (name,resultlength,ierr)
MPI_Finalize
Terminates the MPI execution environment. This function should be the last MPI routine called in every MPI program - no other MPI routines may be called after it.
MPI_Finalize ()
MPI_FINALIZE (ierr)
Compiling and executing a MPI program on Spruce
To compile the program you need first to load the modules
module load compilers/gcc/6.3.0 mpi/openmpi/2.0.2_gcc63
To execute a MPI program is strongly suggested to run it using the queue system. Assuming that the executable is a.out. A minimal submission script could be:
#!/bin/sh
#PBS -N MPI_JOB
#PBS -l nodes=1:ppn=4
#PBS -l walltime=00:05:00
#PBS -m ae
#PBS -q debug
#PBS -n
module load compilers/gcc/6.3.0 mpi/openmpi/2.0.2_gcc63
cd $PBS_O_WORKDIR
mpirun -np 4 ./a.out
This submission script is requesting 4 cores for running your job in one node. Using MPI you are not constrain to use a single node, but it is always a good idea to run the program using cores as close as possible.
Sending Messages with Broadcasting
Lets review one classic example of MPI programming. Many functions are computationally perform by expansions in series. Lets use a inverse tangent series that to approximate pi. The idea is to spread the terms of the sum among several cores, giving each of them a range of values to work. The C version of the code is:
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
#include <math.h>
int main(int argc, char *argv[])
{
int n, myid, numprocs, i;
double PI25DT = 3.141592653589793238462643;
double mypi, pi, h, sum, x;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
if (argc < 2) {
n=10;
}
else {
n=atoi(argv[1]);
}
MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD);
h = 1.0 / (double) n;
sum = 0.0;
printf(" Rank %d computing from %d up to %d jumping %d\n", myid, myid+1, n, numprocs);
for (i = myid + 1; i <= n; i += numprocs) {
x = h * ((double)i - 0.5);
sum += (4.0 / (1.0 + x*x));
}
mypi = h * sum;
MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
if (myid == 0)
printf("pi is approximately %.16f, Error is %.16e\n",
pi, fabs(pi - PI25DT));
MPI_Finalize();
return 0;
}
The version in Fortran 90 is:
program main
use mpi
double precision pi25dt
parameter (pi25dt = 3.141592653589793238462643d0)
double precision mypi, pi, h, sum, x, f, a
integer n, myid, numprocs, i, ierr
character(len=32) :: arg
! Function to integrate
f(a) = 4.d0 / (1.d0 + a*a)
! Basic initialization calls
call mpi_init(ierr)
call mpi_comm_rank(mpi_comm_world, myid, ierr)
call mpi_comm_size(mpi_comm_world, numprocs, ierr)
if (myid .eq. 0) then
i = 0
ierr = 1
do
call get_command_argument(i, arg)
if (len_trim(arg) == 0) exit
i = i+1
if (i .eq. 2) then
read(arg,*,iostat=ierr) n
if (ierr .ne. 0 ) then
write(*,*) 'Conversion failed asuming n = 10 ierr=',ierr,n
n = 10
end if
end if
end do
write(*,*) 'Splitting with N = ', n
endif
! Broadcast n
call mpi_bcast(n, 1, mpi_integer, 0, mpi_comm_world, ierr)
! Calculate the interval size
h = 1.0d0/n
sum = 0.0d0
write(*,*) 'Rank ', myid, 'Computing from ', myid+1, 'jumping ', numprocs
do i = myid+1, n, numprocs
x = h * (dble(i) - 0.5d0)
sum = sum + f(x)
enddo
mypi = h * sum
! Collect all the partial sums
call mpi_reduce(mypi, pi, 1, mpi_double_precision, &
mpi_sum, 0, mpi_comm_world, ierr)
! Node 0 prints the answer.
if (myid .eq. 0) then
print *, 'pi is ', pi, ' error is', abs(pi - pi25dt)
endif
call mpi_finalize(ierr)
end program main
The compilation line is:
mpicc mpi_pi.c
or
mpif90 mpi_pi.f90
The executable expects one argument, the number of elements to perform the integration.
mpirun -np 4 ./a.out 10000
There are two new functions here:
MPI_Bcast
Data movement operation. Broadcasts (sends) a message from the process with rank “root” to all other processes in the group.
MPI_Bcast (&buffer,count,datatype,root,comm)
MPI_BCAST (buffer,count,datatype,root,comm,ierr)
MPI_Reduce
Collective computation operation. Applies a reduction operation on all tasks in the group and places the result in one task.
MPI_Reduce (&sendbuf,&recvbuf,count,datatype,op,root,comm)
MPI_REDUCE (sendbuf,recvbuf,count,datatype,op,root,comm,ierr)
The predefined MPI reduction operations appear below. Users can also define their own reduction functions by using the MPI_Op_create routine.
MPI | Reduction Operation | C Data Types | Fortran Data Type |
---|---|---|---|
MPI_MAX | maximum | integer, float | integer, real, complex |
MPI_MIN | minimum | integer, float | integer, real, complex |
MPI_SUM | sum | integer, float | integer, real, complex |
MPI_PROD | product | integer, float | integer, real, complex |
MPI_LAND | logical AND | integer | logical |
MPI_BAND | bit-wise AND | integer, MPI_BYTE | integer, MPI_BYTE |
MPI_LOR | logical OR | integer | logical |
MPI_BOR | bit-wise OR | integer, MPI_BYTE | integer, MPI_BYTE |
MPI_LXOR | logical XOR | integer | logical |
MPI_BXOR | bit-wise XOR | integer, MPI_BYTE | integer, MPI_BYTE |
MPI_MAXLOC | max value and location | float, double and long double real, complex, double precision | |
MPI_MINLOC | min value and location | float, double and long double real, complex, double precision |
Key Points
MPI is the fact standard for large scale numerical computing, where a single node no matter how big, does not have the cores or memory able to run the simulation.