Stick a Fork in It
Process data faster using ESE and fork-join
Last
time I blogged on the ESE API, which makes it easy for IDL programs to make use
of the ENVI Services Engine. (See the March 26, 2015 post titled “The ESE API
for IDL”.) Typically one will want to use ESE to run PRO-code remotely, on a
cluster that has the close proximity to the data and a lot of computational
power. However, ESE can also run on a single-user machine just the same.
As
you’ll recall, the ESE API makes it easy to write PRO code to run ESE tasks,
monitor them and get results. All the complexity of making HTTP requests and
such is handled by the API. This leads to PRO code that feels natural. For
example:
oTask = ESE.findTask( 'penny64', 'ese_addition'
)
oJob = oTask.run( a = 1, b =
2 )
print, oJob.answer
This
runs the addition task on a remote machine named “penny64”. Simple. So let’s
try something less simple. Let’s use ESE to parallelize a computation, thereby
finding the answer faster.
We
are going to use the fork-join coding pattern. This involves:
1) Splitting the data into
independent chunks
2) Processing chunks in parallel
3) Merging the results
The
example we are going to use is trivial, but illustrates the point. Instead of
using IDL’s “total” function, we are going to write our own that uses ESE for
the parallelization part of fork-join. First, consider this task:
pro sum, a = a, b =
b, sum = sum
sum = a + b
end
It’s
simple, so just imagine that this is an amazingly brilliant, time consuming
algorithm suitable to the kind of data you work with. This is the one that
will get you the Nobel Prize... in computer science. Shucks.
Now,
examine the client code that computes the total, using the ESE API, running the
task in parallel, on chunks of the data:
pro fork_join_total
; generate the data
n = 20 ; an even
number, please
m = n / 2
data = indgen( n )
; get the task that simply adds two numbers
oTask = ESE.findTask( 'penny64', 'sum' )
; run the task on pairs of numbers
;
; Several pairs are summed at the same time. This is
where we get our
; performance improvement over doing them one at a time,
linearly.
oJobs = objarr( m )
for i = 0, m - 1 do begin
x = 2 * i
oJobs[ i ] =
oTask.run( a = data[ x ], b = data[ x + 1 ], /async )
endfor
; wait for all jobs to complete
status = ESE.join( oJobs
)
; merge the results from all the jobs that ran
independently
sumTotal = 0
for i = 0, m - 1 do begin
sumTotal +=
oJobs[ i ].sum
endfor
print, sumTotal
end
The
key bits of code that should grab your eye are:
oJobs[ i ] =
oTask.run( a = data[ x ], b = data[ x + 1 ], /async )
and
status = ESE.join( oJobs )
That
first line comes from the loop that hands off chunks of data to the “sum”
algorithm. Note the “/async”. This allows the loop to continue iterating over
the chunks of data, firing off more calls to the algorithm before the prior
ones have finished.
That
other line blocks the client application until all externally running jobs are
completed. There are options to ignore failures (and keep processing) and to
report job status via a callback mechanism. Those features are described in
the documentation. Note that we’re not doing error checking, which the
“status” variable could help with.
There
is actually another important line that should draw your attention:
sumTotal += oJobs[ i ].sum
This
comes from the loop that merges the results of each independent jobs. This is
the standard pattern in the fork-join model. Split the data. Run an algorithm
independently on the chunks. Wait. Merge the results into something more
meaningful.
You
may have noticed that fork-join sounds a lot like map-reduce (Hadoop style).
There are differences, but the basic idea is the same: divide and conquer. If
your data, algorithm and computer system allow for it, fork-join should
decrease the total time to perform a computation.
When
might it not pay off? If concurrently running tasks overly compete for system
resources then one may not see benefits. This is highly system dependent, as
machines will vary in disk, network and CPU performance. Performance is also
dependent upon the algorithm. For example, if the computation spends most of
its time reading and writing to disk (it’s I/O bound) then the disk and network
will be the limiting factors.
To
show how performance might improve with parallelization, our task has been
modified to invoke FFT. This uses more of the CPUs, slowing down the
computation, and hence better simulates a real-world computation:
pro sum, a = a, b =
b, sum = sum
f = fft( randomu( s, 1729, 1729, 5 ) )
sum = a + b
end
The
following graph shows how parallelization can greatly improve performance and
yet how throwing more ESE workers at a computation has diminishing returns:

In
this case, the sweet spot is about 4 ESE workers. A different computation, on
a different system, might have a different graph and hence a different optimal
number of ESE workers.
If
you have a copy of the ENVI Services Engine then you can run this example. See
the attached task and client code. Be sure to adjust the machine name from
“penny64” to the name or IP address of the machine that hosts your ESE
installation. Also, don’t forget to upload the task. Finally, note that
despite the name, ESE can run plain-old IDL code, sans ENVI. Moreover, a local
installation of ESE is not required.
For
more information see the on-line IDL and ESE documentation: http://www.exelisvis.com/docs/. The ESE
API docs are part of the IDL documentation.