Sunday, April 12, 2009

Matrix multiplication

from http://artisans-serverintellect-com.si-eioswww6.com/default.asp?W40

The example below shows the how a sequential algorithm, namely matrix multiplication, is parallelized using jsr166y/forkjoin.We begin with a sequential matrix multiplication algorithm, adapted from Jama. Next, we show how the sequential algorithm can be executed concurrently using ExecutorService. Finally, we show a ParallelArray implementation, and compare execution times. Note that this example is not meant to show optimal high performance matrix multiplcation. Rather, the intent simply is to show how using ForkJoin is easier than concurrency using the existing java.util.concurrent capabilities.

Naive Matrix Multiplication

First, here is a naive matrix multiplication algorithm. It is somewhat inefficient because two-dimensional array indexing is a bit slow in Java.

void naiveMatrixMultiply( // c[m][p] = a[m][n] times b[n][p]
final double[][] a,
final double[][] b,
final double[][] c) {
check(a,b,c); // checks for null args, incorrectly sized matrices, etc. See sidebar below
  final int m = a.length;
final int n = b.length;
final int p = b[0].length;
for (int j = 0; j < p; j++) {
       for (int i = 0; i < m; i++) {
           double s = 0;
           for (int k = 0; k < n; k++) {
               s += a[i][k] * b[k][j];
           }
           c[i][j] = s;
       }
    }
}

Jama Matrix Multiplication

A more efficient algorithm exists in Jama. It improves the naive algorithm by transposing a column bj for use in the inner loop, favoring a faster single dimension array Bcolj[k] operation over a slower two dimensional array indexing operation b[k][j]. It also lifts the loop invariant a[i] out of the innermost loop. The net result is an algorithm that runs in about 25% of the time of the original naive algorithm.

void jamaMatrixMultiply(
final double[][] a,
final double[][] b,
final double[][] c) {
check(a,b,c);
final int m = a.length;
final int n = b.length;
final int p = b[0].length;

final double[] Bcolj = new double[n];
for (int j = 0; j < p; j++) {
for (int k = 0; k < n; k++) {
Bcolj[k] = b[k][j];
}
for (int i = 0; i < m; i++) {
final double[] Arowi = a[i];
double s = 0;
for (int k = 0; k < n; k++) {
s += Arowi[k] * Bcolj[k];
}
c[i][j] = s;
}
}
}

Threaded Matrix Multiplication

Next, let's look at running matrix multiply concurrently. The first implementation is to use ExecutorService and is shown below. As you can see, this is much more complicated. We must manually divide the work up into tasks and provide each task with its from/to bounds. Each task will run the entire outer loop, but for only a specific non-overlapping values j in the range 0 <= j < N.

Also note that each task must allocate its own local copy of the transposed column array, Bcolj. Thus, if we run this using T threads, we require T double[N] arrays, whereas the Jama algorithm only requires one double[N] transient array.

Shown in green is the additional or changed code required to run this with an ExecutorService. Unfortunately, you can see that it is a fairly intrusive change; over half the code is dedicated to the parallelization.

void threadedMatrixMultiply(
final double[][] a,
final double[][] b,
final double[][] c, final int numTasks) {
check(a,b,c);
    final int m = a.length;
    final int n = b.length;
final int p = b[0].length;
    final ExecutorService executor = Executors.newFixedThreadPool(numTasks);
    for (int interval = numTasks, end = p, size = (int) Math.ceil(p * 1.0 / numTasks);
interval > 0;
interval--, end -= size) {
        final int to = end;
        final int from = Math.max(0, end - size);
        final Runnable runnable = new Runnable() {
            public void run() {
                final double[] Bcolj = new double[n];

                for (int j = from; j < to; j++) {
                    for (int k = 0; k < n; k++) {
                        Bcolj[k] = b[k][j];
                    }
                    for (int i = 0; i < m; i++) {
                        final double[] Arowi = a[i];
                        double s = 0;
                        for (int k = 0; k < n; k++) {
                            s += Arowi[k] * Bcolj[k];
                        }
                        c[i][j] = s;
                    }
                }
            }
        };
        executor.execute(runnable);
    }
    try {
        executor.shutdown();
        executor.awaitTermination(2, TimeUnit.DAYS); // O(n^3) can take a while!
    } catch (final InterruptedException e) {
        System.out.println("Interrupted.");
        empty(c);
    }

}

You may note that there is no synchronization in this implementation. This is safe because there is no mutable data shared across threads. The input arrays a and b are not mutated. While the contents of output array c are altered, each thread works on different slices, c[*][from] to c[*][to-1]. The transient array Bcolj is also confined to the thread where it is allocated, [re]initialized, and used. The method iself is not thread-safe in that multiple simultaneous invocations are not safe.

See also Doug Lea's divide and conquer matrix multiply, listed here for reference. 

Matrix Multiplication using ForkJoin

Finally, we show how to adapt the Jama algorithm to jsr166y/forkjoin. We begin by creating a ParallelDoubleArray, based on the input array b[0]; this is because the outer loop of the original Jama algorithm iterates over j from 0 to p; this is the same indexing as for the array b[0]. However, we do not care about the contents of the array (see the sidebar below). Next, we use ParallelArray.replaceWithMappedIndex(Ops.IntAndDoubleToDouble op) to run the inner loop in parallel over the entire array. This performs the equivalent of the sequential for loop

for (int j = 0; j <>

or the threaded

for (int j = from; j <>

with the ForkJoin framework manages the partitioning and outer loop for us. It passes the index j to theOps.IntAndDoubleToDouble operator via its op(int a, double b) method as well as the value b[0][j], which is unused. The method body transposes Bcolj, then performs the two nested loops. Note that since ForkJoin uses a single operator instance (from the anonymous inner class), we must allocate the transient Bcolj array inside the op method. If this were not a local array inside the operation method but was instead and instance field, multiple threads would update it concurrently while working on different columns, causing corruption. (Tim Peierls caught this bug in an earlier version of this algorithm.) Like the threaded example above, this implementation does not require explicit synchronization.

Shown in green is the additional code required to implement the Jama matrix multiplication with ForkJoin:

void forkJoinMatrixMultiply(
final double[][] a,
final double[][] b,
final double[][] c) {
check(a,b,c);
final int m = a.length;
final int n = b.length;
final ParallelDoubleArray cols = ParallelDoubleArray.createUsingHandoff(b[0],
ParallelDoubleArray.defaultExecutor());
cols.replaceWithMappedIndex(new Ops.IntAndDoubleToDouble() {

public double op(final int j, final double element) {

final double[] Bcolj = new double[n]; // allocated inside the op, not "outside the loop"
for (int k = 0; k < n; k++) {
Bcolj[k] = b[k][j];
}
for (int i = 0; i < m; i++) {
final double[] Arowi = a[i];
double s = 0;
for (int k = 0; k < n; k++) {
s += Arowi[k] * Bcolj[k];
}
c[i][j] = s;
}
return element;
}
});
}

As you can see, the ForkJoin implementation is less intrusive. This is despite the fact that it must allocate a transient work array inside each operation (a tribute to the JVM memory management).

The inner product computation could be parallelized as well, as in this example.

Sidebar: Note that we would have preferred to use apply(Ops.Procedure procedure) instead of  replaceWithMappedIndex(Ops.IntToObject) but the op() method in Ops.Procedure does not have the int index parameter which is needed for the matrix multiplication - the algorithm's loop body requires the index value j. Finally, because Ops.IntAndDoubleToDouble.op(int a, double b) is not a void method, we must return the input value, which is stored back into the ParallelDoubleArray.

An indexed Procedure interface will be added to ForkJoin to address this. 
 

Sidebar: Below is a ForkJoin implementation of the check(double[][] a, double[][] b, double[][] c) method used above which uses an Ops.Procedure to verify the matrix multiply arguments are not null or empty or nonuniform and that the dimensions match.

private void check(final double[][] a,
final double[][] b,
final double[][] c) {
    check(a);
    check(b);
    check(c);
    if (c == a | c == b)
        throw new IllegalArgumentException("a or b cannot be used for output c");
    // check dimensionality
    final int am = a.length, an = a[0].length;
    final int bm = b.length, bn = b[0].length;
    final int cm = c.length, cn = c[0].length;
    if (bm != an)
        throw new IllegalArgumentException("a.n != a.m");
    if (cm != am)
        throw new IllegalArgumentException("c.m != a.m");
    if (cn != bn)
        throw new IllegalArgumentException("c.n != b.n");
}

private void check(final double[][] array) {
    if (array == null || array.length == 0 || array[0] == null)
        throw new IllegalArgumentException("Array must be non-null and non empty.");
    final int length = array[0].length;
    final ParallelArray rows = ParallelArray.createUsingHandoff(array,
            ParallelArray.defaultExecutor());
    rows.apply(new Ops.Procedure() {
        public void op(final double[] row) {
            if (row == null || row.length != length)
                throw new IllegalArgumentException("Ragged arrays not allowed.");
        }
    });
}

Performance

Here are some simple metrics showing the time required to multiply matrices using sequential Jama, concurrent Jama, and forkjoin Jama algorithms.

The operation is C = A × B where A is a 1600 × 1200 matrix; B is 1200 × 1400 and the result is 1600 × 1400. The results were obtained on a Linux running Java 1.6.0_01 on a system with two quad core processors (8 total cores) and 4GB RAM. Times are in seconds

 

number of threads
Sequential
ExecutorService 
ForkJoin
2
7.32
5.48
4.37
4
7.38
3.47
3.28
8
7.40
2.08
 2.01

One can see that the ForkJoin implementation is comparable to the threaded ExecutorService implementation.

No comments: