Distributed parallel programming in Python : MPI4PY

1 Introduction

MPI stands for Message passing interface. An implementation of MPI such as MPICH" or OpenMPI is used to create a platform to write parallel programs in a distributed system such as a Linux cluster with distributed memory. Generally the platform built allows programming in C using the MPI standard. So in order to run Parallel programs in this environment in python, we need to make use of a module called MPI4py which means "MPI for Python". This module provides standard functions to do tasks such as get the rank of processors, send and receive messages/ data from various nodes in the clusters. It allows the program to be parallely executed with messages being passed between nodes. It is important that MPIch2 and MPI4py is installed in your system. So, if you haven't installed MPI4Py, following are 2 guides to refer to for installing, building and testing a sample program in MPI4PY.

https://seethesource.wordpress.com/2015/01/05/raspberypi-hacks-part1/
https://seethesource.wordpress.com/2015/01/14/raspberypi-hacks-part2/

Once MPI4PY is installed, you can start programming in it. This tutorial covers the various important functions provide by MPI4PY like sending-receiving messages, scattering and gathering data and broadcasting message and how it can be used by providing examples. Using these information, it is possible to build scalable efficient distributed parallel programs in Python. So, let's begin.

 

2 Sending and receiving Messages

Communication in mpi4py is done using the send() and he recv() methods. As the name suggests, it is used to send and receive messages from nodes respectively.

2.1 Introduction to send()

The general syntax of this function is: comm.send(data,dest)

here "data" can be any data/message which has to be sent to another node and "dest" indicates the process rank of node(s) to send it to.

Example: comm.send((rank+1)*5,dest=1).
This sends the message "(rank+1)*5" to the node with process rank=1. So only that node can receive it.

2.2 Introduction to recv()

The general syntax of this function is: comm.recv(source)

This tells a particular process to receive data/message only from the process with rank mentioned in "source" parameter.

Example: comm.recv(source=1)
This receives the message only from a process with rank=1.

2.3 Example with simple send() and recv()

if rank==0 :
	    data= (rank+1)*5
	    comm.send(data,dest=1)
if rank==1:
	    data=comm.recv(source-0)
	    print data

(For full implementation program refer to Example1.py)

[Download Example1.py]

2.4 Notes

  • When a node is running the recv() method, it waits till it receives some data from the expected source. Once it receives some data, it continues with the rest of the program.
  • Here, the "dest" parameter in send() and "source" parameter in recv() need not have just a constant value (or rank), it can be an expression.
  • The "size" member of "comm" object is a good way to conditionalize send() and receive() methods and this leads us to have dynamic sending and receiving of messages.

2.5 Sending and receiving dynamically

Dynamic transfer of data is far more useful as it allows data to be sent and received by multiple nodes at once and decision to transfer can be done depending on particular situations and thus this increases the flexibility dramatically.

2.6 Example of dynamic sending and receiving of data

comm.send(data_shared,dest=(rank*2)%size)
comm.recv(source=(rank-3)%size)

The above two statements are dynamic because, the data to be sent and also who it has to be sent to depends on the value substituted by rank and size , which are dynamically happen and so this eliminates the need for hard-coding the values. The recv() method, however, receives only one message even though its qualified to receive many of them, so only the first message it receives, it services and continues to the next statement in the program.

(for full implementaion refer to Example2.py)

[Download Example2.py]

 

3 Tagged send() and recv() functions

When we tag the send() and recv(), we can guarantee the order of receiving of messages, thus we can be sure that one message will be delivered before another

During dynamic transfer of data, situations arise where, we need a particular send() to match a particular recv() to achieve a kind of synchronization. This can be done using the "tag" parameter in both send() and recv().

For example a send() can look like : comm.send(shared_data,dest=2,tag=1) and a matching recv() to the above statement would look like: comm.recv(source=1,tag=1)

So, this structure forces a match, leading to synchronization of data transfers. The advantage of tagging is that a recv() can be made to wait till it receives data from a corresponding send() with the expected tag. But, this has to be used with extreme care as it can lead to a deadlock state.

3.1 Example

  If rank==0:
	shared_data1 = 23
	comm.send(shared_data1,dest=3,tag=1)
	shared_data2 = 34
	comm.send(shared_data2,dest=3,tag=2)
  if rank==3:
	recv_data1 = comm.recv(source=0,tag=2)
	print  recv_data1
	recv_data2 = comm.recv(source=0,tag=1)
	print  recv_data2

The output of this would look like:

34
23

Thus, we can see that even though shared_data1 was sent first the first recv() waited for the send() with tag=2 and received the data , printed it and forwarded it to the next recv() method.

(For full implementations refer to Example3.py)

[Download Example3.py]

 

4 Broadcast

Broadcast allows a user to add dynamic property to parallel programming , where , some data that is generated by the master once can be broadcasted to all the nodes. This avoids the repetitive procedure to send data to each and every node. From master/root node, we create data from this node and send it to all the other nodes.

4.1 Example

   if rank == 0
       data = {'a':1,'b':2,'c':3}
   else
       data = None
   data = comm.bcast(data,root=0)
   print  "rank", rank, data

In the above program, the bcast() method , the first parameter "data" represents what has to be broadcasted and the second parameter "root=0" indicates from where we are getting the data from.If we run this program using 5 processes, the output should look similar to:

rank 0 {'a':1,'b':2,'c':3}
rank 4 {'a':1,'b':2,'c':3}
rank 3 {'a':1,'b':2,'c':3}
rank 1 {'a':1,'b':2,'c':3}
rank 2 {'a':1,'b':2,'c':3}

5 Scatter

Scatter is the process of breaking up of data and sending or distributing each part to a different node.An example would be to break down a list and send each list element to a different node.

5.1 Example

   comm = MPI.COMM_WORLD
   size=comm.get_size()
   rank-comm.get_rank()
   if rank == 0: 
       data = [(x+1) ** x for x in range (size)]
       print 'scattering data',data
   else:
       data = None
   data = comm.scatter(data,root=0)
   print 'rank',rank,'has data: ', data

The output for this program will be:

scattering data : [1,2,9,64,625]

rank 0 has data : 1
rank 1 has data : 2
rank 2 has data : 9
rank 3 has data : 64
rank 4 has data : 625

here the size of data should be equal to the number of data that can be expected, that is if the data has 10 elements, and if only 5 processes exist then error occurs.

 

6 Gather

Gather is the opposite of scatter. It is used to collect the data from various nodes and store it as one.An example would be to collect elements from various noeds and making a single list out of those.

6.1 Example

Example: (Please note this is the same as the previous example for scatter but with a few additions)

 comm = MPI.COMM_WORLD
   size=comm.get_size()
   rank-comm.get_rank()
   if rank == 0: 
       data = [(x+1) ** x for x in range (size)]
       print 'scattering data',data
   else:
       data = None
   data = comm.scatter(data,root=0)
   print 'rank',rank,'has data: ', data
     
   new_data = comm.gather(data, root=0)
   if rank == 0:
       print  'master collected: ', new_data
   

The output for this program will be:

rank 0 has data : 1
rank 1 has data : 2
rank 2 has data : 9
rank 3 has data : 64
rank 4 has data : 625 
master collected:  [ 1, 2, 3, 9,64, 625]  

In the above output, the last line is because of the gather. Another thing that can be done is to change the data as soon as it is scattered, that is when each node received a part of the scattered data. Example: data= data+1
Thus when the gather happens the data is not same as the one sent but maintains the changed pattern thus proving that it works correctly

 

7 Conclusion

This post has given introduction and usage of various programming constructs of mpi4py that will help you to write parallel programs for a distributed environment using the python programming language. Though these are individual constructs that are shown above in most of the programs all or most of them are used together. I would like to share some of the applications that can be built using mpi4py.

  • Automatic text summarization - a branch of Natural language processing, where a distributed program can be written to summarize several text documents at once
  • Sorting and searching - the distributed environment can be made use of by splitting the data to be sorted or searched so that a huge amount can be processed in a small amount of time.
  • Mathematical problem solutions - such as poison's distribution can be solved much faster in a parallel environment

So, we can write parallel programs for any kind of field and use the benefits provided by python.

Share this page:

1 Comment(s)