Wednesday, May 13, 2015

Using Java Thread Pools

Here's a quick (and somewhat dirty) solution in Java to process a set of tasks in parallel. It does not require any third party libraries. Users can specify the tasks to be executed by implementing the Task interface. Then, a collection of Task instances can be passed to the TaskFarm.processInParallel method. This method will farm out the tasks to a thread pool and wait for them to finish. When all tasks have finished, it will gather their outputs, put them in another collection, and return it as the final outcome of the method invocation.
This solution also provides some control over the number of threads that will be employed to process the tasks. If a positive value is provided as the max argument, it will use a fixed thread pool with an unbounded queue to ensure that no more than 'max' tasks will be executed in parallel at any time. By specifying a non-positive value for the max argument, the caller can request the TaskFarm to use as many threads as needed.
If any of the Task instances throw an exception, the processInParallel method will also throw an exception.
package edu.ucsb.cs.eager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class TaskFarm<T> {

    /**
     * Process a collection of tasks in parallel. Wait for all tasks to finish, and then
     * return all the results as a collection.
     *
     * @param tasks The collection of tasks to be processed
     * @param max Maximum number of parallel threads to employ (non-positive values
     *            indicate no upper limit on the thread count)
     * @return A collection of results
     * @throws Exception If at least one of the tasks fail to complete normally
     */
    public Collection<T> processInParallel(Collection<Task<T>> tasks, int max) throws Exception {
        ExecutorService exec;
        if (max <= 0) {
            exec = Executors.newCachedThreadPool();
        } else {
            exec = Executors.newFixedThreadPool(max);
        }

        try {
            List<Future<T>> futures = new ArrayList<>();

            // farm it out...
            for (Task t : tasks) {
                final Task task = t;
                Future f = exec.submit(new Callable<T>() {
                    @Override
                    public T call() throws Exception {
                        return task.process();
                    }
                });
                futures.add(f);
            }

            List<T> results = new ArrayList<>();

            // wait for the results
            for (Future f : futures) {
                results.add(f.get());
            }
            return results;
        } finally {
            exec.shutdownNow();
        }
    }

}

No comments: