Distributed Numerical Optimization

5 April 2013 | Miles Lubin

This post walks through the parallel computing functionality of Julia to implement an asynchronous parallel version of the classical cutting-plane algorithm for convex (nonsmooth) optimization, demonstrating the complete workflow including running on both Amazon EC2 and a large multicore server. I will quickly review the cutting-plane algorithm and will be focusing primarily on parallel computation patterns, so don't worry if you're not familiar with the optimization side of things.

Cutting-plane algorithm

The cutting-plane algorithm is a method for solving the optimization problem

minxRdi=1nfi(x)\min_{x \in \mathbb R^d} \sum_{i=1}^n f_i(x)

where the functions fif_i are convex but not necessarily differentiable. The absolute value function x|x| and the 1-norm x1 ||x|| _ 1 are typical examples. Important applications also arise from Lagrangian relaxation. The idea of the algorithm is to approximate the functions fif_i with piecewise linear models mi m_i which are built up from information obtained by evaluating fi f_i at different points. We iteratively minimize over the models to generate candidate solution points.

We can state the algorithm as

  1. Choose starting point x x .

  2. For i=1,,ni = 1,\ldots,n, evaluate fi(x)f_i(x) and update corresponding model mi m_i .

  3. Let the next candidate x x be the minimizer of i=1nmi(x) \sum_{i=1}^n m_i(x) .

  4. If not converged, goto step 2.

If it is costly to evaluate fi(x) f_i(x) , then the algorithm is naturally parallelizable at step 2. The minimization in step 3 can be computed by solving a linear optimization problem, which is usually very fast. (Let me point out here that Julia has interfaces to linear programming and other optimization solvers under JuliaOpt.

Abstracting the math, we can write the algorithm using the following Julia code.

# functions initialize, isconverged, solvesubproblem, and process implemented elsewhere
state, subproblems = initialize()
while !isconverged(state)
    results = map(solvesubproblem,subproblems)
    state, subproblems = process(state, results)

The function solvesubproblem corresponds to evaluating fi(x)f_i(x) for a given i i and x x (the elements of subproblems could be tuples (i,x)). The function process corresponds to minimizing the model in step 3, and it produces a new state and a new set of subproblems to solve.

Note that the algorithm looks much like a map-reduce that would be easy to parallelize using many existing frameworks. Indeed, in Julia we can simply replace map with pmap (parallel map). Let's consider a twist that makes the parallelism not so straightforward.

Asynchronous variant

Variability in the time taken by the solvesubproblem function can lead to load imbalance and limit parallel efficiency as workers sit idle waiting for new tasks. Such variability arises naturally if solvesubproblem itself requires solving a optimization problem, or if the workers and network are shared, as is often the case with cloud computing.

We can consider a new variant of the cutting-plane algorithm to address this issue. The key point is

In other words, we generate new tasks to feed to workers without needing to wait for all current tasks to complete, making the algorithm asynchronous. The algorithm remains convergent, although the total number of iterations may increase. For more details, see this paper by Jeff Linderoth and Stephen Wright.

By introducing asynchronicity we can no longer use a nice black-box pmap function and have to dig deeper into the parallel implementation. Fortunately, this is easy to do in Julia.

Parallel implementation in Julia

Julia implements distributed-memory parallelism based on one-sided message passing, where process push work onto others (via remotecall) and the results are retrieved (via fetch) by the process which requires them. Macros such as @spawn and @parallel provide pretty syntax around this low-level functionality. This model of parallelism is very different from the typical SIMD style of MPI. Both approaches are useful in different contexts, and I expect an MPI wrapper for Julia will appear in the future (see also here).

Reading the manual on parallel computing is highly recommended, and I won't try to reproduce it in this post. Instead, we'll dig into and extend one of the examples it presents.

The implementation of pmap in Julia is

function pmap(f, lst)
    np = nprocs()  # determine the number of processors available
    n = length(lst)
    results = cell(n)
    i = 1
    # function to produce the next work item from the queue.
    # in this case it's just an index.
    next_idx() = (idx=i; i+=1; idx)
    @sync begin
        for p=1:np
            if p != myid() || np == 1
                @spawnlocal begin
                    while true
                        idx = next_idx()
                        if idx > n
                        results[idx] = remotecall_fetch(p, f, lst[idx])

On first sight, this code is not particularly intuitive. The @spawnlocal macro creates a task on the master process (e.g. process 1). Each task feeds work to a corresponding worker; the call remotecall_fetch(p, f, lst[idx]) function calls f on process p and returns the result when finished. Tasks are uninterruptible and only surrender control at specific points such as remotecall_fetch. Tasks cannot directly modify variables from the enclosing scope, but the same effect can be achieved by using the next_idx function to access and mutate i. The task idiom functions in place of using a loop to poll for results from each worker process.

Implementing our asynchronous algorithm is not much more than a modification of the above code:

# given constants n and 0 < alpha <= 1
# functions initialize and solvesubproblem defined elsewhere
np = nprocs()
state, subproblems = initialize()
converged = false
isconverged() = converged
function updatemodel(mysubproblem, result)
    # store result
    # decide whether to generate new subproblems
    state.numback[mysubproblem.parent] += 1
    if state.numback[mysubproblem.parent] >= alpha*n && !state.didtrigger[mysubproblem.parent]
        state.didtrigger[mysubproblem.parent] = true
        # generate newsubproblems by solving linear optimization problem
        if ... # convergence test
            converged = true
            append!(subproblems, newsubproblems)
            push!(state.didtrigger, false)
            push!(state.numback, 0)
            # ensure that for s in newsubproblems, s.parent == length(state.numback)

@sync begin
    for p=1:np
        if p != myid() || np == 1
            @spawnlocal begin
                while !isconverged()
                    if length(subproblems) == 0
                        # no more subproblems but haven't converged yet
                    mysubproblem = shift!(subproblems) # pop subproblem from queue
                    result = remotecall_fetch(p, solvesubproblem, mysubproblem)
                    updatemodel(mysubproblem, result)

where state is an instance of a type defined as

type State

There is little difference in the structure of the code inside the @sync blocks, and the asynchronous logic is encapsulated in the local updatemodel function which conditionally generates new subproblems. A strength of Julia is that functions like pmap are implemented in Julia itself, so that it is particularly straightforward to make modifications like this.

Running it

Now for the fun part. The complete cutting-plane algorithm (along with additional variants) is implemented in JuliaBenders. The code is specialized for stochastic programming where the cutting-plane algorithm is known as the L-shaped method or Benders decomposition and is used to decompose the solution of large linear optimization problems. Here, solvesubproblem entails solving a relatively small linear optimization problem. Test instances are taken from the previously mentioned paper.

We'll first run on a large multicore server. The runals.jl (asynchronous L-shaped) file contains the algorithm we'll use. Its usage is

julia runals.jl [data source] [num subproblems] [async param] [block size]

where [num subproblems] is the nn as above and [async param] is the proportion α\alpha. By setting α=1\alpha = 1 we obtain the synchronous algorithm. For the asynchronous version we will take α=0.6\alpha = 0.6. The [block size] parameter controls how many subproblems are sent to a worker at once (in the previous code, this value was always 1). We will use 4000 subproblems in our experiments.

To run multiple Julia processes on a shared-memory machine, we pass the -p N option to the julia executable, which will start up N system processes. To execute the asynchronous version with 10 workers, we run

julia -p 12 runals.jl Data/storm 4000 0.6 30

Note that we start 12 processes. These are the 10 workers, the master (which distributes tasks), and another process to perform the master's computations (an additional refinement which was not described above). Results from various runs are presented in the table below.

Synchronous Asynchronous
No. Workers Speed Efficiency Speed Efficiency
10 154 Baseline 166 Baseline
20 309 100.3% 348 105%
40 517 84% 654 98%
60 674 73% 918 92%

Table: Results on a shared-memory 8x Xeon E7-8850 server. Workers correspond to individual cores. Speed is the rate of subproblems solved per second. Efficiency is calculated as the percent of ideal parallel speedup obtained. The superlinear scaling observed with 20 workers is likely a system artifact.

There are a few more hoops to jump through in order to run on EC2. First we must build a system image (AMI) with Julia installed. Julia connects to workers over ssh, so I found it useful to put my EC2 ssh key on the AMI and also set StrictHostKeyChecking no in /etc/ssh/ssh_config to disable the authenticity prompt when connecting to new workers. Someone will likely correct me on if this is the right approach.

Assuming we have an AMI in place, we can fire up the instances. I used an m3.xlarge instance for the master and m1.medium instances for the workers. (Note: you can save a lot of money by using the spot market.)

To add remote workers on startup, Julia accepts a file with a list of host names through the --machinefile option. We can generate this easily enough by using the EC2 API Tools (Ubuntu package ec2-api-tools) with the command

ec2-describe-instances | grep running | awk '{ print $5; }' > mfile

On the master instance we can then run

julia --machinefile mfile runatr.jl Data/storm 4000 0.6 30

Results from various runs are presented in the table below.

Synchronous Asynchronous
No. Workers Speed Efficiency Speed Efficiency
10 149 Baseline 151 Baseline
20 289 97% 301 99.7%
40 532 89% 602 99.5%

Table: Results on Amazon EC2. Workers correspond to individual m1.medium instances. The master process is run on an m3.xlarge instance.

On both architectures the asynchronous version solves subproblems at a higher rate and has significantly better parallel efficiency. Scaling is better on EC2 than on the shared-memory server likely because the subproblem calculation is memory bound, and so performance is better on the distributed-memory architecture. Anyway, with Julia we can easily experiment on both.

Further reading

A more detailed tutorial was prepared for the Julia IAP session at MIT in January 2013.

Creative Commons License
Distributed Numerical Optimization by Miles Lubin is licensed under a Creative Commons Attribution 4.0 International License.