Outline:
Science and engineering computations are now typically made on computers. Computers enable realistic modeling of physical phenomenon, allowing us to examine more than just special cases. e.g., "dusty deck" Fortran codes may represent a core engineering expertise. To improve validity of physical models, we typically add more grid points. For example, we might want a 1K by 1K by 1K grid. If there is one eight byte floating point number for each element of the grid, that would be 8 Gbytes.
One reason to go to parallel computation is just to have sufficient RAM available. Many of the standard finite element packages still are not well parallelized, but nonetheless require lots of memory to model in sufficient detail. The fastest computers now (the ones that solve physical problems with the best resolution) are parallel computers.
The dominant style of parallel computer is an MIMD (Multiple Instruction Multiple Device) computer. The style of programming I'll talk about here is Single Instruction Multiple Device (SIMD). This uses "if this group of nodes, else that group of nodes") constructs so that SENDS, RECEIVES, and synchronization points can all appear in the same code.
If all processors can access the same RAM, the computer is said to be shared memory.
If memory is distributed, message passing is the usual style of programming. Each process accesses memory on other processors by sending and receiving messages. Message passing codes work on both shared and distributed memory machines.
MPI (Message Passing Interface) is a standard. Before MPI (and PVM) it was necessary to rewrite the message passing parts of routines for every new parallel platform that came along.
Considering the great number of "dead" architectures, this meant that by the time code worked well (a year or two) the machine it was written for was almost out of date.
Connection Machines, DAP, BBN, Kendall Square, DEC, Intel Message Passing, Sequent, Cray, SGI ...
MPI programs work on both shared memory and distributed memory machines. So just as we have "dusty deck" Fortran codes, we will have legacy MPI-FORTRAN and MPI-C codes. Hopefully, they will be maintainable? Portably fast?
MPI is a very rich (complicated) library. But it is not necessary to use all the features. An advantage to using a simple subset is these have been optimized by most vendors. Also it makes thinking about programming easy. On the other hand, it is fun to look up other functions and see if they will simplify programming. Often the problem you are having is one addressed by some MPI construct.
There is a set of sample programs. Due mainly to Peter Pacheco from the University of San Francisco. You can find these at Sample Code tar file. There are subdirectories of C and Fortran codes. For information on how to run codes on the NCSU HPC machines, see How To and FAQs.
You can look at the mpi.h, fmpi.h, mpio.h file on your system. Usually these are found in /usr/include.
I'm presenting codes in Fortran. (Argue: Fortran is the subset of C/C++ which is most useful for scientific computing, other features of C, e.g. pointer arithmetic, are likely to slow performance). However, C variants of sample codes are also available in the tar ball pachec.tar. Most of the C and Fortran codes are due to Peter Pacheco and his students at the University of San Francisco. His text, "Parallel Progamming with MPI", Morgan Kaufmann, 1997, is a more complete presentation -- and quite straightforward.
Fortran calling arguments to MPI routines are different than C in that an ierr argument is appended. Where a C function would return an integer zero on successful return, the Fortran subroutine returns ierr as zero instead. Fortran arguments are easier than C in not being sensitive to whether a call is "by reference". For example, a C call to MPI_Comm_Rank (explained further below) could be
ierr = MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);where we have declared p and ierr by
int p, ierr;
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
The corresponding Fortran call is
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
integer p, ierr
In both cases MPI_COMM_WORLD has been declared in an include file.
Whether we use Fortran or C, close attention to types
and orders of arguments to MPI calls is necessary.
SAMPLE EASY CODES
Monte Carlo codes run many instances of a given event. The original Monte Carlo calculations were run to model an H-Bomb. The Russians already had one, how by hook or by crook to model the interactions of a neutron?
Monte Carlo calculations of a given constant (e.g., mean free path of a neutron or area under a curve) have
Error = o(1/sqrt(number of simulations) )
So to get one more digit of accuracy we have to multiply the number of simulations by one hundred. Hence a need for many simulations, i.e, so a need for parallel computation.
The simulations are independent and require little communication, so Monte Carlo codes are known as embarrassingly parallel
(go back to list of files)
FILE: monte.f
c
c Template for a Monte Carlo code.
c
c The root processor comes up with a list of seeds
c which it ships to all processors.
c
c In a real application, each processor would compute
c something and send it back. Here they compute
c a vector of random numbers and send it back.
c
c This version uses a loop of sends to mail out the
c seeds. And uses a loop to send data back to root.
program monte
c
include 'mpif.h'
c
integer my_rank
integer p
integer source
integer dest
integer tag
integer iseed,initseed
integer status(MPI_STATUS_SIZE)
integer ierr
integer i
real*8 ans(10), ans2(10)
real*8 startim,entim
c
c function
integer string_len
c
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
c
if (my_rank.eq.0) then
print*,'input random seed'
read*,iseed
endif
c------------------------------------------------------------------------
c if the timing results seem peculiar, try uncommenting the next line
c call MPI_BARRIER(MPI_COMM_WORLD,ierr) !
c !
c ! What difference is there?
startim = MPI_Wtime()
if (my_rank.eq.0) then
initseed = int(random()*1000)
c startim = MPI_Wtime()
do dest = 1, p-1 ! CPU 0 loops through p-1 sends
initseed = int(random()*1000)
tag = 0
call MPI_Send(initseed, 1, MPI_INTEGER,
+ dest, tag, MPI_COMM_WORLD, ierr)
c ----------------------------------------------------------------------
c Send message consisting of
c initseed -- arg 1 message sent
c 1 -- arg 2 , length of message
c MPI_INTEGER -- arg 3 , type of data sent
c dest -- arg 4, rank of processor to which message sent
c tag -- arg 5, some integer, needs to be matched by RECV
c MPI_COMM_WORLD -- arg 6, handle of communicator, matched by RECV
c ierr -- arg 7, output from MPI_SEND, will be 0 if successful
c This call can block. Code will not proceed until the message
c has been copied to a buffer, and if buffer size is not large
c enough the code will stall till the message is successfully
c written.
c -----------------------------------------------------------------------
end do
else ! each of p-1 CPUs gets a message from CPU 0
root = 0
tag = 0
call MPI_Recv(initseed, 100, MPI_INTEGER, root,
+ tag, MPI_COMM_WORLD, status, ierr)
c------------------------------------------------------------------------
c Receive message consisting of
c initseed -- arg 1 message sent
c 100 -- arg 2 , maximal length of message
c MPI_INTEGER -- arg 3 , type of data sent
c root -- arg 4, rank of processor which sent message
c (could use a wild card)
c tag -- arg 5, some integer, needs to be matched by SEND
c (could use a wild card)
c MPI_COMM_WORLD -- arg 6, handle of communicator, matched by SEND
c (no wild card allowed)
c status -- arg 7 integer array status(MPI_STATUS_SIZE)
c ierr -- arg 8, output from MPI_SEND, will be 0 if successful
c The receive is blocking. Code will not go to next step until the
c receive is completed.
c------------------------------------------------------------------------
call MPI_Get_count(status, MPI_INTEGER, size, ierr)
c -----------------------------------------------------------------------
c This call tells us how long the passed message came out to be
c Information about the received message is found in status vector
c ----------------------------------------------------------------------
endif ! input phase done
c-----------------------------------------------------------------------
c Left out -- a body of code that does a bunch of particle tracking
c stuff to produce the double precision vector ans
c-----------------------------------------------------------------------
do i=1,10
ans(i) = rand() ! at least we initialize stuff to send back.
end do
if (my_rank.eq.0) then
tag = 1
do source = 1,p-1
call MPI_RECV(ans2, 10, MPI_DOUBLE_PRECISION,
+ source, tag, MPI_COMM_WORLD,status, ierr)
do i=1,10
ans(i) = ans(i) + ans2(i)
end do
end do
else
tag = 1
call MPI_SEND(ans, 10, MPI_DOUBLE_PRECISION, root,
+ tag, MPI_COMM_WORLD, ierr)
endif
if(my_rank.eq.0) then
c do some stuff to process and output ans
endif
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
end
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
c
Let's look a bit at the arguments for the send and receive.
MPI_SEND,
1st argument, variable to be sent
2nd argument, integer number of items in variable, here 1
3rd , data type--different Fortran and C bindings !!
4th , dest -- integer rank of processor to which message is sent
5th , tag -- integer message tag
6th , handle for MPI communicator
7th , ierr , integer for error message, this argument not used in C ,
output
MPI_RECV
1st arg, variable to be received (output)
2nd arg, integer of number of variables in message , This is
an upper bound for the receive
3rd arg, data type
4th arg, source -- integer rank of processor sending message
(or could be MPI_ANY_SOURCE)
5th arg, tag -- same integer as matching send,
(or could be MPI_ANY_TAG)
6th arg handle for MPI communicator, must match sending communicator
7th arg status, integer status(MPI_STATUS_SIZE) – output
8th arg ierr, output, 0 for successful return
NOTE: on the NC State systems, we run parallel jobs in batch mode.
The above code can fail because it is "interactive" in requiring
an input value. The input value can be supplied from a file.
For example, we an supply a random seed of "2" by making a file
"inp" containing the single entry 2. Suppose the exectuable
file is "monte". Then inside the LSF job submission file (bsub
file) we could use
mpiexec_hydra ./monte < inpinstead of
mpiexec_hydra ./montewhich would be used if no input were required.
Putting a timer in this code, the time (in chassis for Henry2) for a "ping" (send a message and receive something back) was 8.8e-5 seconds.
The integer status(MPI_SOURCE) tells us the processor rank of the source of the received message. The integer status(MPI_TAG) sends us the tag off the received message. There’s also a value status(MPI_ERROR).
In general, I won't write out the MPI arguments in such detail, but as with any C or Fortran library, keeping good track of subroutine arguments is a first key to successfully using a call.
It is often helpful to write a small program to illustrate and verify the action of the routine. On a given architecture, you need to know the correct data types to match the integers used in MPI calls, e.g. source, root, tag, ierr etc. Typically 4 bytes, but ...
In order to model the running of your code, you may want to time a communication pattern.
How long does it take to start a message? What is the bandwidth for long messages? Would another MPI operator work better?
FORTRAN MPI data types include
MPI_INTEGER INTEGER MPI_REAL (single precision) REAL MPI_DOUBLE_PRECISION DOUBLE PRECISION MPI_CHARACTER CHARACTER(1) MPI_COMPLEX COMPLEX MPI_BYTE MPI_PACKED MPI_LOGICAL LOGICAL
C MPI data types include
MPI_INT MPI_FLOAT MPI_DOUBLE MPI_CHAREtc. (See P. 34 MPI The Complete Reference)
Let's consider instead a ring program. It passes
(go back to list of files)
FILE: ring.f
c ring program
c
c pass messages around a ring.
c Input: none.
c
c See Chapter 3, pp. 41 & ff in PPMPI.
c
program greetings
c
include 'mpif.h'
c
integer my_rank
integer p
integer source, source2
integer dest,dest2
integer tag,tag2
integer root
character*100 message,message2
character*10 digit_string
integer size
integer status(MPI_STATUS_SIZE)
integer ierr
integer i,nreps
real*8 startim,entim
c
c function
integer string_len
nreps = 10000
c
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
startim = MPI_Wtime()
do i=1,nreps
call to_string(my_rank, digit_string, size)
message = 'Greetings from process ' // digit_string(1:size)
+ // '!'
if (my_rank.ne.p-1) then
dest = my_rank+1
else
dest = 0
endif
if (my_rank.ne.0) then
source = my_rank-1
else
source = p-1
endif
tag = 0 ! message from even processors have an even tag
tag2 = 1 ! messages from odd processors have an odd tag
root = 0
c
c Note this solution only works if the total number of processors is even
c Actually, it turned out to work !!
c
if(my_rank.eq.2*(my_rank/2)) then ! if my_rank is even
call MPI_Send(message, string_len(message), MPI_CHARACTER,
+ dest, tag, MPI_COMM_WORLD, ierr)
call MPI_Recv(message2, 100, MPI_CHARACTER, source,
+ tag2, MPI_COMM_WORLD, status, ierr)
else
call MPI_Recv(message2, 100, MPI_CHARACTER, source,
+ tag, MPI_COMM_WORLD, status, ierr)
call MPI_Send(message, string_len(message), MPI_CHARACTER,
+ dest, tag2, MPI_COMM_WORLD, ierr)
endif
call MPI_Get_count(status, MPI_CHARACTER, size, ierr)
c print*,'my_rank=',my_rank
c write(6,101) message2(1:size),my_rank
101 format(' ',a,' my_rank =',I3)
end do
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
if (my_rank.eq.0)print*,'number of reps =', nreps
end
This took 1.e-4 seconds per processor (on chassis on Henry2). Note: For each send there must be a matching receive.
An MPI_SEND man or may not block (not specified in standard.) Typically, a send will not block if there is adequate space in buffers. To make the code safe and portable, we complicated things by writing if statements for odd and even rank processors. Else we could (if someone wrote an MPI_Send that "blocked" or if the MPI_Send overflowed a buffer) have had a hung code.
MPI is very rich. There are many ways to accomplish this same operation. e.g.
MPI_SENDRECV
(go back to list of files)
FILE: ring2.f
c send messages right around a ring.
c
c This is simpler than ring.f
c This is in that it uses the MPI_SENDRECV operator.
c
c
c Input: none.
c
c See Chapter 3, pp. 41 & ff in PPMPI.
c
program greetings
c
include 'mpif.h'
c
integer my_rank
integer p
integer source, source2, right, left
integer dest,dest2
integer tag,tag2
integer root
character*100 message,message2
character*10 digit_string
integer size
integer status(MPI_STATUS_SIZE)
integer ierr
integer i,nreps
real*8 startim,entim
c
c function
integer string_len
nreps = 10000
c
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
startim = MPI_Wtime()
do i=1,nreps
call to_string(my_rank, digit_string, size)
message = 'Greetings from process ' // digit_string(1:size)
+ // '!'
if (my_rank.ne.p-1) then
right = my_rank+1
else
right = 0
endif
if (my_rank.ne.0) then
left = my_rank-1
else
left = p-1
endif
root = 0
c Send to the right -- receive from the left.
call MPI_SENDRECV(message, string_len(message), MPI_CHARACTER,
+ right, 0 ,
+ message2, 100 , MPI_CHARACTER,
+ left, 0 ,
+ MPI_COMM_WORLD, status, err)
call MPI_Get_count(status, MPI_CHARACTER, size, ierr)
c print*,'my_rank=',my_rank
c write(6,101) message2(1:size),my_rank
c101 format(' ',a,' my_rank =',I3)
end do
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
if(my_rank.eq.0) print*,' nreps =',nreps
end
| 1st argument : | buffer (variable) to be sent - input |
| 2nd | integer number of items in buffer (think vector) - input |
| 3rd | data type -- different Fortran and C bindings – input |
| 4th | dest -- integer rank of processor to which message is sent - input |
| 5th | tag1 -- integer message tag – input |
| 6th | receive buffer (name unchanged on output contains received data) |
| 7th | integer upper limit on number of received items - input |
| 8th | MPI data type - input |
| 9th | integer source -- rank of processor sending message – input |
| 10th | integer tag of received message – input |
| 11th | name of communicator – input |
| 12th | status integer vector (output) |
| 13th | integer ierr (output) |
There is also an MPI_SENDRECV_REPLACE that uses the same send and receive buffer.
To get the basic MPI set completed, we'll also need global communications, non-blocking communications and ...
Compared to a memory access or to a computation, passing messages is expensive. Passing a message is more like accessing a hard drive. Just as a program that overflows RAM will "thrash", so a program that does fine-grained communication will run very slowly. It is very easy to write parallel programs that run more slowly than serial ones.
An add requires
O(1.e-9) secs in register
O(1.e-8) secs in L2 cache
O(1.e-7) secs in RAM
Other operations.
O(1.e-6 to 1.e-7) secs for a subroutine call--or local MPI call
such as MPI_PACK or MPI_UNPACK
O(1.e-4 to 1.e-5) secs for an MPI_SEND message
O(1.-2 to e-3) secs access data from hard drive
So obviously, we want to make sure that when an add or multiply is be performed that we don't have to wait for a RAM, MPI, or hard drive fetch. So it makes sense to model communication. A simple model is
T_c = (time to start a message) + (bytes in a message)*(time/byte)
Or
T_c = T_s + L * t_b
where T_c is the time to start a message, typically 1.e-5 to 1.e-4 secs depending . t_b is the time to pass a byte is 1.e-8 secs for gigabit ethernet or myrinet. If we want to have the message time T_c in terms of the number of clock cycles or flops we would optimistically have
T_c = 1.e4 + (bytes in a message) * 200
So we have to pass at least 1Kbyte-10Kbytes before the time start the message is less than half the message time. MESSAGES. IT IS VERY EASY TO MAKE CODES RUN SLOWER IN PARALLEL THAN IN SERIAL. THE MOST COMMON BEGINNER ERROR IS TO PASS LOTS OF SMALL MESSAGES
So if we want to make a program slow, all we need do is pass lots of messages.
On the other hand, there is hope. If we can ration the number of messages to be received, then we have a good chance of getting a good parallel efficiency. Thinking about data locality is an intrinsic part of getting good parallel code. It does seem a shame to burden users with it. But of course, it's also what you have to do to get good performance in serial computing. After all, it's less expensive to get data from another processor's RAM than it is to get information from a local hard drive. So if your problem is too big to fit in RAM, you'll probably get better performance by using more than one CPU allotment of RAM.
The discipline of considering data locality also enables more efficient serial code. For example, in serial computing, advertised flop rates are obtained only when data in cache can be reused. Bus speed and cache size are often more important than CPU speed. For accessing data in RAM, we get a formula in clock cycles like
T_a = 200 + (number of bytes) * 50
The computer tries to hide the 200 from you by bringing data into cache in blocks, but if you access the data in the wrong order (e.g., a matrix row wise instead of columnwise), you can get factors of 10 or more slow downs.
I'll present an example for which accessing data from another allows more efficient use of cache memory, (perhaps) enabling superlinear speedup.
Reducing both the total "volume" and the "number" of messages will speed computations.
Successful parallel computations use "local" data for computations, requiring only periodic "global" refreshing, thereby keeping the global message volume small. Physical partitions are laid out to minimize the ratio surface area/ volume.
Arctic bears weigh 1200 pounds, Florida bears weigh 250 pounds. Fully utilize RAM on one processor and just update the data on regions which are influenced by data resident on other processors.
Successful parallel computations find ways to overlap computation and communication. For instance, use non-blocking communications. We'll explore these later.
Not only does one minimize the volume of communication, but also the number of communications should be minimized. Short messages are "packed" into longer messages.
For example, we could pack integer values in a vector, floating point values in another vector, character values in a third vector. Three calls to MPI_PACK can pack the three vectors can be packed into a buffer of type
MPI_PACKED
and sent in one message. Corresponding MPI_UNPACK calls after an MPI_RECV call can unpack the vectors. The initial integer vector can give instructions as to how many elements are to be unpacked.
Example. The following version of the Monte Carlo code packs up data into one message.
(go back to list of files)
FILE: monte3.f
c
c Template for a Monte Carlo code.
c
c The root processor comes up with a list of seeds
c which it ships to all processors.
c
c In a real application, each processor would compute
c something and send it back. Here they compute
c a vector of random numbers and send it back.
c
c This version uses a single BCAST to distribute
c both integer and double precision data. It packs
c integer vectors as well as double precision data
c into an MPI_PACKED buffer.
c This illustrates the use of MPI_PACK and MPI_UNPACK commands.
c
program monte3
c
include 'mpif.h'
c
integer my_rank
integer p
integer source
integer dest
integer tag
integer iseed, initseed, initvec(200)
integer status(MPI_STATUS_SIZE)
integer ierr
integer i , j , root
integer isize, position, kquad, nj
integer itemp(100), buf(4000)
real*8 ans(10), ans2(10) , temp(100)
real*8 startim,entim
c
c
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
startim = MPI_Wtime()
c
do j = 1,100
if (my_rank.eq.0) then
iseed = 2
initvec(1) = random(iseed)
do i=2,p
initvec(i) = int(random()*1000)
end do
ISIZE = ( 26 + NJ + 2*KQUAD )
POSITION = 0
nj = 3
kquad = 4 ! actually these might have been read from a file
itemp(1) = isize
itemp(2) = kquad
do i=1,100
temp(i) = 1. ! more realistically we would read from a file
end do
CALL MPI_PACK ( ITEMP, 2, MPI_INTEGER, BUF, 4000,
+ POSITION, MPI_COMM_WORLD, IERR)
c This call packs the integer vector itemp of length 2 into buf
c starting at position 0. It increments position.
c --------------------------------------------------------------
c See below to see how to unpack.
c MPI_PACK and MPI_UNPACK are good for reducing the number of
c total calls. These calls will allow us to pass multiple
c messages for the latency of one. They are flexible in
c that the length of the unpack can be part of the message.
c The time of the calls to pack and unpack is not significant
c compared to the time to pass a message.
c
c Disadvantage: On the Compaq AlphaServerSC the packing turned
c out to be pretty loose, i.e., there were empty bytes. So combining
c short messages would speed thing , but long messages might get
c enough longer that they would take more time.
c
c --------------------------------------------------------------
CALL MPI_PACK ( initvec, 10, MPI_INTEGER, BUF, 4000,
+ POSITION, MPI_COMM_WORLD, IERR)
CALL MPI_PACK ( TEMP, 100,
+ MPI_DOUBLE_PRECISION, BUF, 4000,
+ POSITION, MPI_COMM_WORLD, IERR)
endif
c pack up data into one message
root = 0
call MPI_BCAST(BUF,4000,MPI_PACKED,0,
+ MPI_COMM_WORLD,IERR)
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
IF ( my_rank.NE.0 ) THEN
POSITION = 0
C for the unpack, reset position to 0. The unpack order is
c the same as the pack order. But the order of arguments
c is changed.
c The call below unpacks integer vector itemp of length 2 from
c the BUF buffer.
c unpack itemp
CALL MPI_UNPACK(BUF, 4000, POSITION, ITEMP, 2, MPI_INTEGER,
+ MPI_COMM_WORLD,IERR)
isize = ITEMP(1)
kquad = ITEMP(2)
c unpack initvec
CALL MPI_UNPACK (BUF, 4000, POSITION, initvec, 10,
+ MPI_INTEGER, MPI_COMM_WORLD, IERR)
myseed = initvec(my_rank)
c unpack temp
CALL MPI_UNPACK ( BUF, 4000, POSITION, TEMP, 100,
+ MPI_DOUBLE_PRECISION, MPI_COMM_WORLD,IERR)
ENDIF
c-----------------------------------------------------------------------
c Left out -- a body of code that does a bunch of particle tracking
c stuff to produce the double precision vector ans
c-----------------------------------------------------------------------
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
ans1 = rand(myseed)
do i=1,10
ans(i) = rand() ! at least we initialize stuff to send back.
! But this call is something I had to change to get the code to
! run here.
end do
call MPI_REDUCE (ans,ans2, 10, MPI_DOUBLE_PRECISION,
+ MPI_SUM, root, MPI_COMM_WORLD, ierr)
c-------------------------------------------------------
c Get the (sum of) data back
c ans -- arg1 -- message sent from each processor
c ans2 -- arg2 -- result deposited on root -- out
c 10 -- arg3 -- length of message
c MPI_DOUBLE_PRECISION --arg4 - data type
c MPI_SUM -- arg5 -- operation performed by reduce
c root -- arg6 -- reduce deposits answer on root
c same on all processors
c MPI_COMM_WORLD -- arg7 -- all procs must have same communicator
c ierr -- arg8 -- integer error--out (only in Fortran)
c------------------------------------------------------
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
if(my_rank.eq.0) then
c do some stuff to process and output ans2
endif
end do
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
end
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
This took 8.04e-4 seconds including the BCAST and REDUCE (4 CPUs on Henry2 -- one chassis.
The MPI_PACK and MPI_UNPACK calls are quite fast compared to the time to start a communication. The main drawback is that the MPI_PACKED data form may include empty bytes, so that for example a four byte integer might end up occupying 16 bytes in the MPI_PACKED message. So the packing of messages will help with latency but not bandwidth.
The most general MPI data type is the MPI_TYPE_STRUCT -- the most general fixed MPI "derived data type" allows multiple types of data entries. This MPI data structure is loosely modeled on C structs (but allows other data types than just integers). The MPI_TYPE_STRUC may suffer from the same loose packing as the MPI_PACKED structure. For the MPI_STRUCT an MPI_Type_Commit is required as a declaration.
The MPI_Type_Commit operation requires more time than an MPI_PACK but may be worthwhile if the same structure will be used repeatedly. The following data types also require an MPI_Type_Commit as a declaration.
MPI_VECTOR -- elements of a single type, allowing a stride. MPI_INDEXED -- elements of a single type, with a variable indexed stride, e.g., to pass the upper triangular part of a matrix. MPI_TYPE_HVECTOR if we want to compute the stride in bytes.
There is some expense in committing a data type, but it only occurs once, so is worthwhile if the same message type is to be frequently reused. Production codes I've seen used the VECTOR, INDEX, HVECTOR constructs. These are supported in MPI-2 IO. MPI_TYPE_STRUCT Is not.
Another "hacker" option. Some MPI programmers just pack all their integer and double precision data into a single double precision vector and rely on type conversion. And plausibly you could also pack your character data into a double precision vector since you're writing the decode.
Practical limitations of the
T_c = T_s + L * t_b
model are that it neglects:
1) noise in the system (e.g. processors have other tasks such as heartbeat, spare daemons) - T_s is sporadically large.
2) network saturation. Causes T_b to be sporadically large.
Both 1&2 can cause slower communications and motivate use of non-blocking sends and receives to allow overlapping of communication.
An advantage of the
T_c = T_s + L * t_b
model is we can model communication time via pen and paper before writing a large program. For instance we can conclude that laying out matrix data from square blocks will give total message lengths O(n /sqrt(p)) for p processors. Where column blocks might give O(n)
EXERCISE:
Estimate T_c by repeatedly passing short messages, and L by passing long messages. Note MPI comes with a good wall clock timer MPI_Wtime() which returns a double precision number and usually times in microseconds.
startim = MPI_Wtime()
stuff to time, e.g., 1000 repetitions of some MPI call.
entim = MPI_Wtime() - startim
Then if we can track down all the communications and already know how long the computation will take in serial, then we can estimate parallel performance.
EXERCISE:
What parallel computation are you interested in? How long does it take in serial? Can you estimate how much communication is required? Can you predict parallel performance?
A typical 2-D parallel application may have communication volume O(sqrt(volume on a processor)) * log(number of processors) where the computations go as O(volume on a processor) So the total time is
T = O(sqrt(V)*log(P)) + O(V)
and the parallel efficiency is
E = O(V)/(( O(sqrt(V)*log(P) + O(V) )
which can be close to the ideal of one if O(V) sufficiently large compared to O(sqrt(V)). But slowness of communication puts a high constant on O(sqrt(V))
Example: Consider matrix vector multiplication. Suppose we'll multiply an n x n matrix A times an n-vector x and return the vector x to the root processor.
Assume that A is already distributed to all processors (communicating A to all processors would require more time than computing Ax on one processor). In particular, assume that A is distributed with n/p columns per processor.
A = [A_1 | A_2 | ... | A_p ]
so that A_i has n/p columns. Partition x with n/p elements per block x = [x_1 | x_2 | ... | x_p ]
Then
A*x = A_1*x_1 + A_2*x_2 + ... + A_p*x_p
can be performed as
1) scatter x_i to processor i, i=1:p-1 (MPI_SCATTER)
2) In parallel perform w_i <---A_i*x_i , i=1,p-1 -- computations on each node.
3) perform an MPI_REDUCE to add all the w_i to get x on the head node.
Communication costs are mainly for the reduce, which is passing a vector of length n, log(p) times, i.e., 8*n*log(p)*t_b compared to 2*n*n/p flops for each of the A_i*x_i
so if t_b is flops/byte (how many flops can be performed in the time it takes to pass an additional byte of a message) the parallel efficiency should be
E = (2*n*n/p)/ (2*n*n/p + 8*n*log(p)*t_b)
= (n/p) / (n/p + 4*log(p) * t_b)
E will be near one if n/p >> 4 * log(p) * t_b
i.e., we should have a problem size (for good parallel efficiency with column blocking)
n >> p * ( 4 * log(p) * t_b)
where the DOD tries to keep t_b at about 200.
For the column blocking scheme analyzed so far, the total volume of communication is O(n). We can reduce that to parallel O(n/sqrt(p)). by partitioning the matrix into square blocks of size n/sqrt(p), so that the work per processor is the same. Then we get
E = (2*n*n/p) / (2*n*n/p + 4*n*log(p)*t_b/sqrt(p) )
i.e., to get E near one we should have (for the case that communications don't interfere with each other, and that the matrix in the matrix vector multiply is dense)
n >> sqrt(p) * (2 * log(p) * t_b)
To keep E fixed as the number of processors grows we see we need to grow the problem size n. In this case, as we hold the problem size $n/sqrt(p)$ per processor fixed, the communication only grows as log(p) so we would say the computation is scalable.
As we've already seen in some examples -- Having defined a communicator, (set of processors) some common patterns of communication have special MPI commands.
One example is MPI_BCAST, which broadcasts a message from a specified processor to all other processors. We could implement this (as in the prototype Monte Carlo code) by
if (my_rank.eq.0) then
do i = 1,p - 1
call MPI_SEND ..(missing args)
end do
else
call MPI_RECV .. (missing args)
endif
but this would require a time proportional to the number p of processors. Whereas, if we implement a "fan-out" algorithm, the cost would go like log p.
By using the MPI_BCAST program, we save the bother. The standard mpich library implements "fan-out". And typically, on a given architecture, this is an algorithm someone has optimized.
call MPI_BCAST( ) same arguments on each processor.
Another common operation is a gather. Each processor contributes some entries to a vector.
call MPI_GATHER( )
call MPI_SCATTER( )
Instead of broadcasting an entire vector, send the first k entries to a first processor, the next k a second, etc. For example, in the monte program, we could have a vector of seeds, so instead of having a loop with matched sends and receives, the root processor would have a single MPI_SCATTER call. Each of the other nodes also would require the same call. If each processor is to get a different number of entries, we can use
call MPI_SCATTERV( )
To collect a different number of entries from each processor
call MPI_GATHERV
Another useful operation is a reduce.
call MPI_REDUCE( reduce_op)
If the reduce_op is addition, a sum of entries (or a sum of vectors) would be deposited on the root processor. Other reduction operations are min, max, max_loc, min_loc, and multiplication. Here's the Monte code redone using an MPI_SCATTER and an MPI_REDUCE
(go back to list of files)
FILE: monte2.f
c Template for a Monte Carlo code.
c
c The root processor comes up with a list of seeds
c which it ships to all processors.
c
c In a real application, each processor would compute
c something and send it back. Here they compute
c a vector of random numbers and send it back.
c
c This version uses a scatter to distribute seeds
c and a reduce to get a sum of distributed data
c
program monte2
c
include 'mpif.h'
c
integer my_rank
integer p
integer source
integer dest
integer tag, root
integer iseed,initseed,initvec(200)
integer status(MPI_STATUS_SIZE)
integer ierr
integer i
real*8 ans(10), ans2(10)
real*8 startim,entim
c
c function
integer string_len
c
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
c
if (my_rank.eq.0) then
print*,'input random seed'
read*,iseed
initvec(1) = random(iseed)
do i=2,p
initvec(i) = int(random()*1000)
end do
endif
c------------------------------------------------------------------------
c if the timing results seem peculiar, try uncommenting the next line
c call MPI_BARRIER(MPI_COMM_WORLD,ierr) !
c !
c ! What difference is there?
startim = MPI_Wtime()
c No need for different call on root and elsewhere
root = 0
call MPI_SCATTER(initvec, 1, MPI_INTEGER,
+ myinit,1,MPI_INTEGER,
+ root, MPI_COMM_WORLD, ierr)
c ----------------------------------------------------------------------
c Scatter message consisting of
c initvec -- arg 1 message sent
c 1 -- arg 2 , length of message to each processor
c MPI_INTEGER -- arg 3 , type of data sent
c myinit -- arg 4, message received on each processor -- output
c 1 -- arg 5, length of message received on each processor
c MPI_INTEGER -- arg 6, type of data received
c root -- arg 7, must be identical for all processors
c origin of message
c MPI_COMM_WORLD -- arg 8 , must be same on all processors.
c ierr -- error info, only in Fortran -- output
c This call may or may not be blocking, not specified in standard.
c -----------------------------------------------------------------------
c-----------------------------------------------------------------------
c Left out -- a body of code that does a bunch of particle tracking
c stuff to produce the double precision vector ans
c-----------------------------------------------------------------------
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
ans1 = rand(myinit)
do i=1,10
ans(i) = rand() ! at least we initialize stuff to send back.
end do
call MPI_REDUCE (ans,ans2, 10, MPI_DOUBLE_PRECISION,
+ MPI_SUM, root, MPI_COMM_WORLD, ierr)
c-------------------------------------------------------
c Get the (sum of) data back
c ans -- arg1 -- message sent from each processor
c ans2 -- arg2 -- result deposited on root -- out
c 10 -- arg3 -- length of message
c MPI_DOUBLE_PRECISION --arg4 - data type
c MPI_SUM -- arg5 -- operation performed by reduce
c root -- arg6 -- reduce deposits answer on root
c same on all processors
c MPI_COMM_WORLD -- arg7 -- all procs must have same communicator
c ierr -- arg8 -- integer error--out (only in Fortran)
c
c------------------------------------------------------
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
if(my_rank.eq.0) then
c do some stuff to process and output ans2
endif
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
end
c
c
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
c
An MPI_ALLREDUCE will deposit the "sum" on all processors in the communicator. This took 2.52e-4 seconds per iteration (four processors) .
MPI_ALL_GATHER will perform the gather” onto all processors in the communicator.
Example Matrix vector multiplication, use row blocks, bcast vector to all. then gather the answer use column blocks, scatter x, use reduce to get y on root.
Example, gemvt in the next section.
One complication of global communications: It is not specified in the standard whether they should be blocking If you want to ensure that a global communication is completed before the program proceeds, a safe way is call MPI_BARRIER(communicator)
Collective communications valid only within a communicator.
Multiple Communicators
MPI defines communicators. A motivation is to provide a context so that library functions can be secure and non-conflicting. Global communications are easy to use and efficient. They work only within a communicator. One way to provide multiple communication groups is to provide multiple communicators.
MPI_COMM_SPLIT
allows a communicator to be split into sub-communicators. We can define structure to a communicator by using topologies. Example, Cartesian topologies.
Example.
(go back to list of files)
FILE: p2gemv.f
c REVISED GARY HOWELL--to do parallel gemvt -- October 6, 2003.
C
C Input: matrix A, vector x, currently hardwired.
C Output: results of calls to various functions testing topology
C creation
c
c This one will do paired matrix vector multiplications using
c the square topology (gemvt).
c
C Limitations of code so far:
c 1) each processor has to have the same size matrix
c 2) block size kb is hardwired at 100
c 3) the number of processors total must be a perfect square.
C
C Note: Assumes the number of process, p, is a perfect square
C
C See Chap 7, pp. 121 & ff in PPMPI
C
PROGRAM PGEMVT
INCLUDE 'mpif.h'
integer lda
parameter (lda=5000)
integer p,m,n,k,nb,i,j
real preal
real a(lda,lda),x(lda),y(lda),z(lda),w(lda)
integer my_rank
integer q
integer grid_comm
integer dim_sizes(0:1)
integer wrap_around(0:1)
integer reorder
integer coordinates(0:1)
integer my_grid_rank
integer grid_rank
integer free_coords(0:1)
integer row_comm
integer col_comm
integer row_test
integer col_test
integer ierr
real*8 entim,start
C
C
m = 4000 ! these are the local matrix size
n = 4000
k = 40
reorder = 1
call MPI_INIT( ierr)
call MPI_COMM_SIZE(MPI_COMM_WORLD, p, ierr )
call MPI_COMM_RANK(MPI_COMM_WORLD, my_rank, ierr )
C
preal = p
q = sqrt(preal)
C
dim_sizes(0) = q
dim_sizes(1) = q
wrap_around(0) = 0
wrap_around(1) = 0
call MPI_CART_CREATE(MPI_COMM_WORLD, 2, dim_sizes,
+ wrap_around, reorder, grid_comm, ierr)
C
call MPI_COMM_RANK(grid_comm, my_grid_rank, ierr)
call MPI_CART_COORDS(grid_comm, my_grid_rank, 2,
+ coordinates, ierr)
C
call MPI_CART_RANK(grid_comm, coordinates, grid_rank,
+ ierr)
C
free_coords(0) = 0
free_coords(1) = 1
c call MPI_CART_SUB(grid_comm, free_coords, row_comm, ierr)
call MPI_COMM_SPLIT(MPI_COMM_WORLD,coordinates(0),my_rank,
+ row_comm,ierr)
if (coordinates(1) .EQ. 0)then
row_test = coordinates(0)
else
row_test = -1
endif
call MPI_BCAST(row_test, 1,MPI_INTEGER, 0,row_comm, ierr)
c print*,'after first cartsub',row_test,my_rank
free_coords(0) = 1
free_coords(1) = 0
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
c call MPI_CART_SUB(grid_comm, free_coords, col_comm, ierr)
call MPI_COMM_SPLIT(MPI_COMM_WORLD,coordinates(1),my_rank,
+ col_comm,ierr)
if (coordinates(0) .EQ. 0)then
col_test = coordinates(1)
else
col_test = -1
endif
call MPI_BCAST( col_test, 1, MPI_INTEGER,0, col_comm,ierr)
c print*,'after second cartsub',col_test,my_rank
c call MPI_BARRIER(MPI_COMM_WORLD,ierr)
c
c A more general matrix input would be useful
c
c The global matrix has entries a(i,j) = i-j
c the global x vector has entrix x(i) = i
c print*,'my_rank,9 and 1',my_rank,coordinates(0),coordinates(1)
do j=1,n
do i=1,m
a(i,j) = coordinates(0)*m+i - (coordinates(1)*n+j)
c print*,'a(i,j),my_rank=',a(i,j),i,j,my_rank
end do
y(i) = 0.0
end do
do i=1,m
x(i) = coordinates(0)*m + i
c print*,'x(',i,')=',x(i),'my_rank=',my_rank
w(i) = 0.0
end do
if(my_rank.eq.0) then
start = MPI_Wtime()
endif
do ii=1,10
c Each processor belongs to a column communicator, so will use that
c communicator to help with the global matrix vector product x^T A_i
c looping through the column blocks A_i for the local matrix A
kb = n/k
c print*,'kb =',kb
do i=1,m
x(i) = coordinates(0)*m + i + ii/10.
c print*,'x(',i,')=',x(i),'my_rank=',my_rank
w(i) = 0.0
end do
do i=1,n/kb
call sgemv('T',m,kb,1.0,a(1,(i-1)*kb+1),
+ lda,x,1,1.0,y((i-1)*kb+1),1 )
call MPI_ALLREDUCE(y((i-1)*kb+1),z((i-1)*kb+1),
+ kb,MPI_REAL,MPI_SUM,col_comm,ierr)
c call MPI_BARRIER(MPI_COMM_WORLD,ierr)
c call MPI_BCAST(z((i-1)*kb+1),
c + kb,MPI_REAL,0,col_comm)
call sgemv('N',m,kb,1.0,a(1,(i-1)*kb+1),
+ lda,z((i-1)*kb+1),1,1.0,w,1)
end do
c Take care of extra columns (for the case that k does not divide n exactly
call sgemv('T',m,n-(n/kb)*kb,1.0,a(1,(i-1)*kb+1),
+ lda,x,1,1.0,y((i-1)*kb+1),1 )
call MPI_ALLREDUCE(y((i-1)*kb+1),z((i-1)*kb+1),
+ n-(n/kb),MPI_REAL,MPI_SUM,col_comm,ierr)
call sgemv('N',m,n-(n/kb)*kb,1.0,a(1,(i-1)*kb+1),
+ lda,z((i-1)*kb+1),1,1.0,w,1)
c Column communicator operations are done, next do the row communicator
c operation to sum up the local Aw's to get the global x
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
c do j=1,n
c print*,'z(',j,')=',z(j),my_rank
c print*,'y(',j,')=',y(j),my_rank
c end do
call MPI_ALLREDUCE(w,x,m,
+ MPI_REAL,MPI_SUM,row_comm)
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
c After the scatter reduce, each row communicator has the necessary local x
c to start over (or to do local updates)
end do
call MPI_BARRIER(MPI_COMM_WORLD,ierr)
if(my_rank.eq.0)then
entim = MPI_Wtime()-start
print*,entim
c do i=1,m
c print*,'x(',i,')=',x(i)
c end do
endif
c if(my_rank.eq.3)then
c entim = MPI_Wtime()-start
c print*,entim
c do i=1,m
c print*,'x(',i,')=',x(i)
c end do
c endif
call MPI_FINALIZE(ierr)
end
Yesterday we considered collective communication. It has the advantage that the same call can be used on all processors in a communicator. And the communications take advantage of “roll-in”, “roll-out” patterns of communication so that the time to communicate to p processors goes like log(p). If we need different groups of processors, we can split an existing communicator into sub communicators. Then on each smaller communicator, we can use collective communication routines.
REMEMBER: collective communications may not be blocking. If we want to make sure that the communication has completed, put an MPI_BARRIER call after the collective communication call.
The first day we did point to point communication. If we want to communicate to p processors, these are inefficient, (time is O(p)) but they are the method of choice for communicating to a few processors.
So far, we considered some ways to do communication between adjacent processors
paired MPI_SENDs and MPI_RECVs (sending message around a ring)
if (my_rank is even) then
call MPI_Send(message, string_len(message), MPI_CHARACTER,
+ dest, tag, MPI_COMM_WORLD, ierr)
call MPI_Recv(message2, 100, MPI_CHARACTER, source,
+ tag2, MPI_COMM_WORLD, status, ierr)
else
call MPI_Recv(message2, 100, MPI_CHARACTER, source,
+ tag, MPI_COMM_WORLD, status, ierr)
call MPI_Send(message, string_len(message), MPI_CHARACTER,
+ dest, tag2, MPI_COMM_WORLD, ierr)
endif
complicated. The wrong order would cause the code to hang.
or simpler MPI_SENDRECV
c Send to the right -- receive from the left.
call MPI_SENDRECV(message, string_len(message), MPI_CHARACTER,
+ right, 0 ,
+ message2, 100 , MPI_CHARACTER,
+ left, 0 ,
+ MPI_COMM_WORLD, status, err)
call MPI_Get_count(status, MPI_CHARACTER, size, ierr)
which at least won’t block as easily. It is blocking. Meaning that code lines after this will not be executed until after this operation is completed.
We may prefer nonblocking communications. They allow us to have the computer do something else while the communication completes. Also we don’t have to worry (so much) about the order of sends and receives.
One way is to do ISENDs and IRECVs.
(go back to list of files)
FILE: nonblock.f
c ring program
c
c pass messages "right" around a ring.
c
c
c Input: none.
c
c This version uses nonblocking sends. This is
c convenient in that we don't have to keep
c track of the order of the sends.
c Buffer overflow would be possible, so this
c method is not "safe"
c
c See Chapter 3, pp. 41 & ff in PPMPI.
c
program greetings
c
include 'mpif.h'
c
integer my_rank
integer p
integer source, source2
integer dest, dest2
integer left, right
integer tag, tag2
integer root
character*100 message,message2
character*10 digit_string
integer size
integer status(MPI_STATUS_SIZE)
integer ierr, request
integer i
real*8 startim,entim
c
c function
integer string_len
c
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
startim = MPI_Wtime()
do i=1,1
call to_string(my_rank, digit_string, size)
message = 'Greetings from process ' // digit_string(1:size)
+ // '!'
if (my_rank.ne.p-1) then
right = my_rank+1
else
right = 0
endif
if (my_rank.ne.0) then
left = my_rank-1
else
left = p-1
endif
tag = 0 ! message from even processors have an even tag
tag2 = 1 ! messages from odd processors have an odd tag
root = 0
call MPI_ISEND(message, string_len(message), MPI_CHARACTER,
+ right, 0, MPI_COMM_WORLD, request ,ierr)
call MPI_IRECV(message2, 100, MPI_CHARACTER, left,
+ 0, MPI_COMM_WORLD, request ,ierr)
call MPI_WAIT(request,status,ierror)
call MPI_Get_count(status, MPI_CHARACTER, size, ierr)
c print*,'my_rank=',my_rank
write(6,101) message2(1:size),my_rank
101 format(' ',a,' my_rank =',I3)
end do
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
end
Passing messages only to the right took 1.e-4 seconds per message on Henry2, vs. 1.7e-5 seconds on an AlphaSC.
FILE: persist3.f
c ring program
c
c pass messages "right and left" around a ring.
c
c
c Input: none.
c
c This version uses nonblocking sends. This is
c convenient in that we don't have to keep
c track of the order of the sends.
c Buffer overflow would be possible, so this
c method is not "safe"
c
c See Chapter 3, pp. 41 & ff in PPMPI.
c
program greetings
c
include 'mpif.h'
c
integer my_rank
integer p
integer source, source2
integer dest, dest2
integer left, right
integer tag, tag2
integer root
character*100 message,message2,message3
character*10 digit_string
integer size,size2
integer status(MPI_STATUS_SIZE),status2(MPI_STATUS_SIZE)
integer ierr, request,request2,request3,request4
integer i,nreps
real*8 startim,entim
c
c function
integer string_len
c
nreps = 10000
call MPI_Init(ierr)
c
call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierr)
call MPI_Comm_size(MPI_COMM_WORLD, p, ierr)
startim = MPI_Wtime()
call to_string(my_rank, digit_string, size)
message = 'Greetings from process ' // digit_string(1:size)
+ // '!'
if (my_rank.ne.p-1) then
right = my_rank+1
else
right = 0
endif
if (my_rank.ne.0) then
left = my_rank-1
else
left = p-1
endif
c This call initiates the required sends and receives. Note all persistent requests are
c nonblocking.
call MPI_SEND_INIT(message, string_len(message), MPI_CHARACTER,
+ right, 0, MPI_COMM_WORLD, request ,ierr)
call MPI_RECV_INIT(message2, 100, MPI_CHARACTER, left,
+ 0, MPI_COMM_WORLD, request2 ,ierr)
call MPI_SEND_INIT(message, string_len(message), MPI_CHARACTER,
+ left, 1, MPI_COMM_WORLD, request3 ,ierr)
call MPI_RECV_INIT(message3, 100, MPI_CHARACTER, right,
+ 1, MPI_COMM_WORLD, request4 ,ierr)
c One easy refinement -- put the requests as req(1), req(2), req(3), req(4)
c Then the start calls become CALL MPI_STARTALL(4,req,ierr)
c and the wait call become CALL MPI_WAITALL(4,req,ierr)
do i=1,nreps
call MPI_START(request,ierr)
call MPI_START(request2,ierr)
call MPI_WAIT(request,status,ierror)
call MPI_WAIT(request2,status,ierror)
c call MPI_Get_count(status, MPI_CHARACTER, size, ierr)
call MPI_START(request3,ierr)
call MPI_START(request4,ierr)
call MPI_WAIT(request3,status2,ierror)
call MPI_WAIT(request4,status2,ierror)
c call MPI_Get_count(status2, MPI_CHARACTER, size2, ierr)
c print*,'my_rank=',my_rank
c write(6,101) message2(1:size),my_rank
c write(6,101) message3(1:size2),my_rank
101 form t(' ',a,' my_rank =',I3)
end do
entim = MPI_Wtime() - startim
c
call MPI_Finalize(ierr)
print*,' elapsed time =',entim, ' my_rank=',my_rank
if(my_rank.eq.0) print*,'number of reps =',nreps
end
The refined version returns to the 5 microsecond time on the HP SCs. And is (as are all persistent requests) nonblocking. This is also the "champ" on henry2, 6.6e-5 seconds (or 66 microseconds), on chassis. This is for paired messages.
On the AlphaSC I was able to get lower latencies with the shmem library. This library lets us use one-sided communication. Shmem is available for SGIs, Crays, and machines with a quadrics switch. Latency there was about 3 microseconds with a barrier between repeated pings.
Shmem was one of the motivations for MPI-2.
MPI is a rich library. Son of MPI (MPI-2) is even richer. The standard was published around 1995. Only now are complete implementations starting to appear. Some of its functionality is available on the cluster here.
A primary motivation of MPI-2 was to include features available in other libraries. (A reservation to MPI-2. It may make MPI so feature rich that like Ada or PL-2 no one has the energy to use it all, or to provide implementations that do all MPI well.)
Some rationales for MPI-2
PVM could spawn processes and provide links between already existing parallel computations. PVM provides standard interfaces for heterogeneous networks (networks consisting of more than one machine architecture – for example Sun Solaris, Linux , and IBM AIX machines)
Shmem efficiently imitates shared memory by allowing computers to get and put onto other processors.
I/O. MPI ignored I/O – global and local access to files on hard drives. This is a significant part of making parallel codes efficient.
So before we’ve completely forgotten about passing messages between adjacent processors, consider one sided communications.
An example of one-sided communication for a “ping”.
Code fragment (from MPI-The Complete Reference Vol. 2)
Call MPI_TYPE_SIZE(MPI_REAL, sizeofreal, ierr)
Call MPI_WIN_CREATE(Blocal, m*sizeofreal, sizeofreal, &
MPI_INFO_NULL, comm, win, ierr)
! returns the communication window win.
Call MPI_WIN_FENCE(0,win,ierr)
Do i=1,m
J = mapLocal(i)/p
K = MOD(maplocal(i),p) ! mod function
CALL MPI_GET(Alocal(i), 1, MPI_REAL, j, k ,1 MPI_REAL, win, ierr)
END DO
CALL MPI_WIN_FENCE(0, win, ierr)
CALL MPI_WIN_FREE(win,ierr)
The MPI_GETs are not blocking. The MPI_WIN_FENCE enforces synchronization.
How well does this work?
I compared times for doing a “ping” with (HP AlphaServer SC with a quadrics switch).
78.8 microSecs (averaged over 10K repetitions) for
MPI_Fence MPI_Put MPI_Fence
44.34 microSecs (averaged over 10K repetitions) for
MPI_Fence MPI_Put
5.47 microSecs (averaged over 10K repetitions)
MPI_Put
So in this case the expensive operation is the synchronization.
Results of comparing various “pings” on a quadrics switch on The SC Alpha Server SC40 at ERDC, one of the five fastest machines in the world is of this type. If you develop good code you can benchmark on the 3000 CPU machine at Pittsburgh SuperComputing.
Comparisons of latency T_s for various calls on an SC40
An incomplete table of latencies.
| Shmem | 3 micro secs With synchronization | |
| ( Exists here on the p690) | ||
| The rest are for calls developed by the MPI committee | ||
| Send Recv paired | (blocking) | 98 micro secs |
| Send Recv paired | 5 micro secs (blocking) | 180 micro secs |
| Send Recv as one command | 5 micro secs (blocking) | 100 |
| MPI_GET one sided with no synchronization | 5 micro secs | |
| Persistent blocking | 5 micro secs | 65 |
| Nonblocking with Wait after | 17 micro secs | 100 |
| One sided with synchronization | 44 micro secs | |
| Broadcast followed by reduce | 250 |
I haven’t worked out a consistent table of software environment vs. chassis locality vs. MPI call, but will update this page as I have more faith in the data. The following is plausible for henry2
T_c = 1.e-4 + bytes * (2.5e-8) seconds
Assuming one flop requires 1.e-9 seconds, the message cost is equivalent to
T_c = 1.e5 + (number of bytes) * 25 flops
Or
T_c = 1.e5 + (number of integers) * 100 flops
Or
T_c = 1.e5 + (number of doubles) * 200 flops The message length for which half of the time is spent waiting is About 1.e5/25 = 4000 bytes = 800 integers = 400 doubles.
So unless your messages are at least this long, most of the message passing time will be in the waits for the messages to start.
For messages longer than this we would probably not use MPI_PACKED format (as it may not be tightly packed so wastes bandwidth).
For messages shorter than this, combining messages into MPI_PACKED is probably a good idea.
We have concentrated on communication between processors. In practice, one of the main communication problems is in disk I/O The program and initial data are written somewhere on hard drives and must be transported to the computational nodes. Results must be written to disk.
Again, we can give an initial model for the time for “passing a message” as
T_c = T_s + (length of message in bytes) * T_b
This comes out to (in seconds)
T_c = 1.e-2 + (length of message in bytes) * 5.e-8
Or in flops,
T_c = 1.e7 + (length of message in bytes) * 50
(assuming disk access time of 10 milliseconds and read write bandwidth of 20 Mbytes/sec, and that a flop take 1.e-9 seconds).
1) A main difference here is the disk access time.
2) The other main complication: most parallel computers have a single image file system. This is convenient in that we can read and write from any processor to a file which will be globally accessible.
But since the file is globally accessible, reads and writes are intrinsically serial. If many different users are reading and writing to the same file system, there may be a good deal of contention. (In contrast, communication between processors is relatively free of contention).
Due to 1) and 2), latency hiding is even more important than in usual message passing.
The main trick to “latency hiding” is caching I/O data in RAM. This is accomplished either by the system or by the user.
System caching of data
The system uses RAM on either the file server node or on the computational node, or both as a buffer (cache). This is invisible to the user until the capability is taken away.
Generally, it’s a good idea to read and write relatively large blocks of data. One code I saw recently attempted to minimize the total number of bytes read. Each processor read a few bytes and used the information from those bytes to jump to another point in the file and then read a few more bytes.
The code I/O times were proportional to the number of processors used. For 128 processors, I/O times went to ten hours. This was with a high performance file system that could read or write 80 Mbytes/sec by striping its writes across a number of RAID boxes. That (PFS) file system accomplishes a high bandwidth by establishing a direct connection from the computational node to the file server node. The “chatter” to set up the direct connection is a bit expensive. 700 connections can be made per second. If each connection delivers only one number, then 700 numbers per second are read or written (as opposed to the peak rate of 10Mbytes?)
Then to get a permanent record we have to write out results.
Writes should be made periodically. If many processors are used for long enough periods of time, it’s likely one will fail, so codes to be truly scalable should occasionally back up data in such a way that the code can be restarted. This is called a “checkpoint restart”.
A typical write rate to a RAID box (collection of four or five disk drives with a parity check disk so that any one disk can go out without a loss of data) may have a rate of 20 Mbytes/sec.
The most common file system is NFS (network file system). It is mature and robust. It accomplishes RAM buffering. So the user can for example, write a number at a time to a file and not see a drastic performance hit.
It has some problems with scalability. It’s the method used on the cluster file system on henry2.
NFS allows memory mapping and file locking, among other features. Since NFS is so universal, MPI-2 I/O operations are likely to work. Because modern versions of NFS cache data, write (and often read) rates are fairly independent of write size. Using NFS instead of PFS, the I/O time was reduced from ten hours to ten minutes.
In practice, the write is to cache (local RAM) or to caches on the file server node. As soon as data has been copied to a RAM buffer, the write or printf statement returns control to the program and executes the next statement.
A downside of caching data is that cached data is not yet actually saved to hard drive so can be lost. In order to ensure data is saved we need to demand a disk synchronization. Calling the C function fdatasync is one way to ensure data have been written.
User Handling of I/O
The most common user optimization is to lump data so that it can be transferred in a few blocks, with rather little synchronization between computational nodes.
For portability, users should read and write in large chunks. In Fortran 90, one can specify records and sizes and can fill the records before writing them. In C, fopen statements can be immediately followed by setvbuf statements specifying large buffer sizes. Compiler options sometimes allow enlarged buffers.
MPI-2 IO allows nonblocking reads and writes. So the user can start a read or a write, and then at some other point in the code (just before the data is used) specify a test that the read or write is complete.
For example, CALL MPI_FILE_OPEN( ) returns integer file handle fh CALL MPI_FILE_SET_VIEW( ) tells how to look at file
CALL MPI_FILE_IREAD( fh, buf1,bufsize,MPI_REAL, req, ierr ) would prefetch data and then the code could do some things and eventually an …
… CALL MPI_WAIT(req,status,ierr) ! synchronization call CALL MPI_FILE_CLOSE(fh, ierr) ! release the file handle.
Question: how to use MPI-2 IO calls when they are file system dependent? One possibility is to write your intent in MPI-2 calls and then make system dependent alternatives.
User problems caused by system caching – solution to have one processor make the reads and writes and then communicate to others by MPI?
Problem: If many processors access the same file there can be delays since files may not be actually be written to disk (the data is lurking somewhere in a cache. Another process can’t actually access the disk till the write is complete).
One user accomplished check point restarts by writing to a file from one processor, then handing off the read to another processor, looping through all processors. The hand-offs turned out to be slow, requiring about 30 seconds each to flush the caches and lock the files. Again this was a “feature” of the high performance file striping file system. As the number of processors grew, the hourly checkpointing occupied an excessive amount of time (e.g., half an hour when he used 64 processors).
In his case, he found it faster to designate one processor as the communication node. He used MPI messages to transfer data to the root processor, which then accomplished the write to disk.
Some other User tricks
Recall that using the global file system is causing contention. Essentially everyone is trying to read and write to the same file system, so delays can result. Users can write to the local file system and then have a background copy to the global file system. Since access to the local file system is lost when the parallel job completes, the parallel job can’t complete till copies from the local to global file system have been made. A portability issue is that sizes of local file systems are highly variable.
It is possible to have pre and post I/O operations. For example, one processor can assemble the input for each processor into an individual file. Then all processors can just read their own file. Or each processor can just write to its own file, and then later on a postprocessing job combines those files into one data file.
Portability Issues Not Addressed
Various machine resources can be exceeded. Buffer sizes. Number of allowed open messages. Disk sizes on local hard drives.
Also we haven’t done much with debugging and profiling.
MPI is just one of the available parallel libraries. It was designed to allow development of other parallel libraries (partly by having messages that do not interfere with messages in other communicators). Some other libraries are:
Lapack and BLAS (Basic Linear Algebra Subroutines) give efficient matrix computations on single processors. These are part of the Intel Math library (which exists on henry2).
SCALAPACK is a parallel version of LAPACK. At least part of SCALAPACK is in the Intel Math Library.
Some other libraries which should be installed are SuperLU (uses “direct” LU decompositions to solve Ax=b, for the case that A is sparse). PARPACK, used to find eigenvalues of sparse matrices. pARMS – iterative solution sparse matrix equations. PETSc – extensible package for scientific computation.
For some others, see the NERSC parallel repository.
You may find that some licensed software will be useful to you and others. If so, please tell us.
All of these are finite element codes which can be applied to a number of physical situations. And TotalView as a debugger.
Conclusions? Please send your ideas and concerns to gwhowell@ncsu.edu or eric_sills@ncsu.edu