julia | source | downloads | docs | packages | blog | community | learning | teaching | publications | juliacon

This post walks through the parallel computing functionality of Julia
to implement an asynchronous parallel version of the classical
*cutting-plane* algorithm for convex (nonsmooth) optimization,
demonstrating the complete workflow including running on both Amazon
EC2 and a large multicore server. I will quickly review the
cutting-plane algorithm and will be focusing primarily on parallel
computation patterns, so don’t worry if you’re not familiar with the
optimization side of things.

The cutting-plane algorithm is a method for solving the optimization problem

$$\text{minimize} _ {x \in \mathbb{R}^d} \sum_{i=1}^n f_i(x) $$

where the functions \( f_i \) are convex but not necessarily differentiable. The absolute value function \( |x| \) and the 1-norm \( ||x|| _ 1 \) are typical examples. Important applications also arise from Lagrangian relaxation. The idea of the algorithm is to approximate the functions \( f_i \) with piecewise linear models \( m_i \) which are built up from information obtained by evaluating \( f_i \) at different points. We iteratively minimize over the models to generate candidate solution points.

We can state the algorithm as

- Choose starting point \( x \).
- For \(i = 1,\ldots,n\), evaluate \( f_i(x) \) and update corresponding model \( m_i \).
- Let the next candidate \( x \) be the minimizer of \( \sum_{i=1}^n m_i(x) \).
- If not converged, goto step 2.

If it is costly to evaluate \( f_i(x) \), then the algorithm is naturally parallelizable at step 2. The minimization in step 3 can be computed by solving a linear optimization problem, which is usually very fast. (Let me point out here that Julia has interfaces to linear programming and other optimization solvers under JuliaOpt.)

Abstracting the math, we can write the algorithm using the following Julia code.

```
# functions initialize, isconverged, solvesubproblem, and process implemented elsewhere
state, subproblems = initialize()
while !isconverged(state)
results = map(solvesubproblem,subproblems)
state, subproblems = process(state, results)
end
```

The function `solvesubproblem`

corresponds to evaluating \( f_i(x) \) for a
given \( i \) and \( x \) (the elements of `subproblems`

could be tuples
`(i,x)`

). The function `process`

corresponds to minimizing the model in step
3, and it produces a new state and a new set of subproblems to solve.

Note that the algorithm looks much like a map-reduce that would be easy to
parallelize using many existing frameworks. Indeed, in Julia we can simply
replace `map`

with `pmap`

(parallel map). Let’s consider a twist that makes
the parallelism not so straightforward.

Variability in the time taken by the `solvesubproblem`

function can lead to
load imbalance and limit parallel efficiency as workers sit idle waiting for new
tasks. Such variability arises naturally if `solvesubproblem`

itself requires
solving a optimization problem, or if the workers and network are shared, as is
often the case with cloud computing.

We can consider a new variant of the cutting-plane algorithm to address this issue. The key point is

- When proportion \(0 < \alpha \le 1 \) of subproblems for a given candidate have been solved, generate a new candidate and corresponding set of subproblems by using whatever information is presently available.

In other words, we generate new tasks to feed to workers without needing to wait for all current tasks to complete, making the algorithm asynchronous. The algorithm remains convergent, although the total number of iterations may increase. For more details, see this paper by Jeff Linderoth and Stephen Wright.

By introducing asynchronicity we can no longer use a nice black-box `pmap`

function and have to dig deeper into the parallel implementation. Fortunately,
this is easy to do in Julia.

Julia implements distributed-memory parallelism based on one-sided message
passing, where process push work onto others (via `remotecall`

) and the
results are retrieved (via `fetch`

) by the process which requires them. Macros
such as `@spawn`

and `@parallel`

provide pretty syntax around this low-level
functionality. This model of parallelism is very different from the typical
SIMD style of MPI. Both approaches are useful in different contexts, and I
expect an MPI wrapper for Julia will appear in the future (see also here).

Reading the manual on parallel computing is highly recommended, and I won’t try to reproduce it in this post. Instead, we’ll dig into and extend one of the examples it presents.

The implementation of `pmap`

in Julia is

```
function pmap(f, lst)
np = nprocs() # determine the number of processors available
n = length(lst)
results = cell(n)
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
next_idx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@spawnlocal begin
while true
idx = next_idx()
if idx > n
break
end
results[idx] = remotecall_fetch(p, f, lst[idx])
end
end
end
end
end
results
end
```

On first sight, this code is not particularly intuitive. The `@spawnlocal`

macro creates a *task*
on the *master process* (e.g. process 1). Each task feeds work to a
corresponding worker; the call `remotecall_fetch(p, f, lst[idx])`

function
calls `f`

on process `p`

and returns the result when finished. Tasks are
uninterruptable and only surrender control at specific points such as
`remotecall_fetch`

. Tasks cannot directly modify variables from the enclosing
scope, but the same effect can be achieved by using the `next_idx`

function to
access and mutate `i`

. *The task idiom functions in place of using a loop to
poll for results from each worker process.*

Implementing our asynchronous algorithm is not much more than a modification of the above code:

```
# given constants n and 0 < alpha <= 1
# functions initialize and solvesubproblem defined elsewhere
np = nprocs()
state, subproblems = initialize()
converged = false
isconverged() = converged
function updatemodel(mysubproblem, result)
# store result
...
# decide whether to generate new subproblems
state.numback[mysubproblem.parent] += 1
if state.numback[mysubproblem.parent] >= alpha*n && !state.didtrigger[mysubproblem.parent]
state.didtrigger[mysubproblem.parent] = true
# generate newsubproblems by solving linear optimization problem
...
if ... # convergence test
converged = true
else
append!(subproblems, newsubproblems)
push!(state.didtrigger, false)
push!(state.numback, 0)
# ensure that for s in newsubproblems, s.parent == length(state.numback)
end
end
end
@sync begin
for p=1:np
if p != myid() || np == 1
@spawnlocal begin
while !isconverged()
if length(subproblems) == 0
# no more subproblems but haven't converged yet
yield()
continue
end
mysubproblem = shift!(subproblems) # pop subproblem from queue
result = remotecall_fetch(p, solvesubproblem, mysubproblem)
updatemodel(mysubproblem, result)
end
end
end
end
end
```

where `state`

is an instance of a type defined as

```
type State
didtrigger::Vector{Bool}
numback::Vector{Int}
...
end
```

There is little difference in the structure of the code inside the `@sync`

blocks, and the asynchronous logic is encapsulated in the local `updatemodel`

function which conditionally generates new subproblems. A strength of Julia is
that functions like `pmap`

are implemented in Julia itself, so that it is
particularly straightforward to make modifications like this.

Now for the fun part. The complete cutting-plane algorithm (along with
additional variants) is implemented in JuliaBenders. The code is
specialized for stochastic
programming where the cutting-plane algorithm is known as the L-shaped
method or Benders decomposition and is used to decompose the solution of
large linear optimization problems. Here, `solvesubproblem`

entails solving a
relatively small linear optimization problem. Test instances are taken from the
previously mentioned paper.

We’ll first run on a large multicore server. The
`runals.jl`

(asynchronous L-shaped) file contains the algorithm we’ll use. Its
usage is

```
julia runals.jl [data source] [num subproblems] [async param] [block size]
```

where `[num subproblems]`

is the \(n\) as above and `[async param]`

is
the proportion \(\alpha\). By setting \(\alpha = 1\) we obtain the
synchronous algorithm. For the asynchronous version we will take \(\alpha =
0.6\). The `[block size]`

parameter controls how many subproblems are sent to
a worker at once (in the previous code, this value was always 1). We will use
4000 subproblems in our experiments.

To run multiple Julia processes on a shared-memory machine, we pass the `-p N`

option to the `julia`

executable, which will start up `N`

system processes.
To execute the asynchronous version with 10 workers, we run

```
julia -p 12 runals.jl Data/storm 4000 0.6 30
```

Note that we start 12 processes. These are the 10 workers, the master (which distributes tasks), and another process to perform the master’s computations (an additional refinement which was not described above). Results from various runs are presented in the table below.

Synchronous | Asynchronous | |||

No. Workers | Speed | Efficiency | Speed | Efficiency |

10 | 154 | Baseline | 166 | Baseline |

20 | 309 | 100.3% | 348 | 105% |

40 | 517 | 84% | 654 | 98% |

60 | 674 | 73% | 918 | 92% |

There are a few more hoops to jump through in order to run on EC2. First we must
build a system image (AMI) with Julia installed. Julia connects to workers over
ssh, so I found it useful to put my EC2 ssh key on the AMI and also set
`StrictHostKeyChecking no`

in `/etc/ssh/ssh_config`

to disable the
authenticity prompt when connecting to new workers. Someone will likely correct
me on if this is the right approach.

Assuming we have an AMI in place, we can fire up the instances. I used an m3.xlarge instance for the master and m1.medium instances for the workers. (Note: you can save a lot of money by using the spot market.)

To add remote workers on startup, Julia accepts a file with a list of host names
through the `--machinefile`

option. We can generate this easily enough by
using the EC2 API Tools (Ubuntu package `ec2-api-tools`

) with the command

```
ec2-describe-instances | grep running | awk '{ print $5; }' > mfile
```

On the master instance we can then run

```
julia --machinefile mfile runatr.jl Data/storm 4000 0.6 30
```

Results from various runs are presented in the table below.

Synchronous | Asynchronous | |||

No. Workers | Speed | Efficiency | Speed | Efficiency |

10 | 149 | Baseline | 151 | Baseline |

20 | 289 | 97% | 301 | 99.7% |

40 | 532 | 89% | 602 | 99.5% |

On both architectures the asynchronous version solves subproblems at a higher rate and has significantly better parallel efficiency. Scaling is better on EC2 than on the shared-memory server likely because the subproblem calculation is memory bound, and so performance is better on the distributed-memory architecture. Anyway, with Julia we can easily experiment on both.

A more detailed tutorial was prepared for the Julia IAP session at MIT in January 2013.