Easy parallelization with data decomposition
Recently I came across this blog post which introduced me to the new multiprocessing module in Python 2.6, a module to execute multiple concurrent processes. It makes parallelizing your programs very easy. The author also provided a smart code snippet that makes using multiprocessing even easier. I studied how the snippet works and I came up with an alternative solution which is in my opinion very elegant and easy to read. I’m so excited about the new possibilities provided by this module that I had to spread the word. But first, off to some background.
The multi-core trend
Moore’s law states that:
The density of transistors on chips doubles every 24 month.
Although Moore’s law, contrary to what is often thought still holds true, the exponential processor transistor growth predicted by Moore does not always translate into exponentially greater practical computing performance. Therefore parallel computation has recently become necessary to take full advantage of the gains allowed by Moore’s law. This explains the recent multi-core trend: most recent computers are now equipped with 2 or more cores.
The problem is that you can’t just use multi-core equipped computers and hope that your programs will run faster on them. Programs need be modified to operate in a parallel fashion as opposed to a sequential fashion.
At the same time, languages like Ruby and Python are famous for their GIL (Global Interpreter Lock). Because of the GIL, even programs that are designed to be parallel can effectively use only one core at a time, resulting in no speed improvement. Parallelism here is just an illusion: the processor switches between threads but does so frequently that the user perceive the operations as being performed in parallel.
The novelty of the multiprocessing module in Python 2.6 is that is uses processes instead of threads (see Threads compared with processes) and it does not suffer from the GIL. Programs running on multi-cores can therefore operate in a truly parallel fashion.
To make things simpler, let me quote the excellent blog post How-to Split a Problem into Tasks.
The very first step in every successful parallelization effort is always the same: you take a look at the problem that needs to be solved and start splitting it into tasks that can be computed in parallel. [...] what I am describing here is also called problem decomposition. The goal here is to divide the problem into several smaller subproblems, called tasks that can be computed in parallel later on. The tasks can be of different size and must not necessarily be independent.
And, about data decomposition:
When data structures with large amounts of similar data need to be processed, data decomposition is usually a well-performing decomposition technique. The tasks in this strategy consist of groups of data. These can be either input data, output data or even intermediate data, decompositions to all varieties are possible and may be useful. All processors perform the same operations on these data, which are often independent from one another. This is my favorite decomposition technique, because it is usually easy to do, often has no dependencies in between tasks and scales really well.
Data decomposition is so straightforward that it can without any doubt be called embarrassingly parallel.
If you are a Python user, you most probably know list comprehensions:
>>> from math import sqrt >>> [sqrt(i) for i in [1, 4, 9, 16]] [1.0, 2.0, 3.0, 4.0]
In this example, sqrt is applied to each element of the list and a list is returned. The resulting list and the input list are therefore the same size.
Probably less known are generator comprehensions, which can be written by replacing the outer brackets with parentheses:
>>> gen = (sqrt(i) for i in [1, 4, 9, 16]) <generator object at 0xb7cec56c> >>> for i in gen: print i 1.0 2.0 3.0 4.0
The difference between list and generator comprehensions is that list comprehensions are evaluated entirely before returning, while generator comprehensions yield results one by one. Generators are therefore more “lazy” and can results in big memory savings when iterating over large lists.
The outer parentheses can even be omitted when calling functions with only 1 argument:
>>> print(sqrt(i) for i in [1, 4, 9, 16]) <generator object at 0xb7cec68c>
For those more familiar with functional programming, list comprehensions are similar to the map higher-order function.
In fact, Python has a built-in map function too.
>>> map(sqrt, [1, 4, 9, 16]) [1.0, 2.0, 3.0, 4.0]
While map applies a function to each element of a list and returns the resulting list, reduce is a higher-order function that uses another function to combine the elements of a list in some way.
>>> plus = concatenate = lambda x,y: x+y >>> reduce(plus, [1,2,3,4]) 10 >>> reduce(concatenate, [[1,2], [3,4]]) [1, 2, 3, 4]
multiprocessing.Pool ‘s map
Applying a function to each element of a list with map kind of assumes that the function is pure, i.e. that the result output by the function is only function of its input arguments. Although nothing prevents you from giving an impure function as argument to map, it is dirty, potentially dangerous and not the functional philosophy. Concretely, it means that you’d better not use global variables or anything the state of which may be changed during the program execution, in your functions. This thus also includes instance methods (an object, in essence, encapsulates a state).
To reuse the terminology above, if we think of applying our function to each element of the list as tasks, then our tasks are independent from each other and so there’s is no reason to operate over the list sequentially. Independence is also very nice because communication and collaboration between threads/processes happen to be one of the most difficult aspect of concurrent programming. Here, no communication between threads/processes is required.
And here comes the new multiprocessing module and more particularly its Pool class. This class represents pools of worker processes and has a map method, which is similar to the map built-in function.
>>> from multiprocessing import Pool >>> pool = Pool(processes=4) >>> pool.map(sqrt, [1,4,9,16]) [1.0, 2.0, 3.0, 4.0]
The difference with the built-in map here is that 4 processes are used. This will result in about a 4x speedup if the computer running the program has at least 4 cores. Of course, sqrt is a toy example but here’s a real-life example in a Machine Learning context.
>>> image_sets = [set1, ..., setn] >>> preprocessed = pool.map(preprocess, images_sets) >>> feat_sets = pool.map(feat_extract, preprocessed) >>> models = pool.map(train, feat_sets)
As long as you can write your code as list comprehensions, you can apply the data decomposition approach. It’s easy, abuse it!
However, spawning a process has a cost because of context switching. Therefore, when the function to be applied on each element returns quasi instantaneously, it may be worth splitting the data into larger chunks, run each chunk in a separate process and then recombine the results with reduce. (See also MapReduce)
Here are some helpers which make parallelizing your list comprehensions even more straightforward and easy to read.
As mentioned before, the blog post that introduced me to this new multiprocessing module also came with a smart code snippet. I reworked it to fit my liking and this is what it looks like now:
>>> sqrtd = delayed(sqrt) >>> powd = delayed(pow) >>> squares = [1, 4, 9, 16] >>> pool_parallelize([sqrtd(i) for i in squares], njobs=4) [1.0, 2.0, 3.0, 4.0] >>> pool_parallelize([powd(i, 0.5) for i in squares], njobs=4) [1.0, 2.0, 3.0, 4.0]
Contrary to Pool’s map, this supports parallelizing functions of any arity.
Then I came up with this solution, which reduces the typing and is quite elegant.
>>> sqrtp = parallelized(sqrt) >>> powp = parallelized(pow) >>> sqrtp(squares, njobs=4) [1.0, 2.0, 3.0, 4.0] >>> powp([(i, 0.5) for i in squares], njobs=4) [1.0, 2.0, 3.0, 4.0]
Code available here.