Wednesday, May 13, 2015

Using Java Thread Pools

Here's a quick (and somewhat dirty) solution in Java to process a set of tasks in parallel. It does not require any third party libraries. Users can specify the tasks to be executed by implementing the Task interface. Then, a collection of Task instances can be passed to the TaskFarm.processInParallel method. This method will farm out the tasks to a thread pool and wait for them to finish. When all tasks have finished, it will gather their outputs, put them in another collection, and return it as the final outcome of the method invocation.
This solution also provides some control over the number of threads that will be employed to process the tasks. If a positive value is provided as the max argument, it will use a fixed thread pool with an unbounded queue to ensure that no more than 'max' tasks will be executed in parallel at any time. By specifying a non-positive value for the max argument, the caller can request the TaskFarm to use as many threads as needed.
If any of the Task instances throw an exception, the processInParallel method will also throw an exception.
package edu.ucsb.cs.eager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class TaskFarm<T> {

    /**
     * Process a collection of tasks in parallel. Wait for all tasks to finish, and then
     * return all the results as a collection.
     *
     * @param tasks The collection of tasks to be processed
     * @param max Maximum number of parallel threads to employ (non-positive values
     *            indicate no upper limit on the thread count)
     * @return A collection of results
     * @throws Exception If at least one of the tasks fail to complete normally
     */
    public Collection<T> processInParallel(Collection<Task<T>> tasks, int max) throws Exception {
        ExecutorService exec;
        if (max <= 0) {
            exec = Executors.newCachedThreadPool();
        } else {
            exec = Executors.newFixedThreadPool(max);
        }

        try {
            List<Future<T>> futures = new ArrayList<>();

            // farm it out...
            for (Task t : tasks) {
                final Task task = t;
                Future f = exec.submit(new Callable<T>() {
                    @Override
                    public T call() throws Exception {
                        return task.process();
                    }
                });
                futures.add(f);
            }

            List<T> results = new ArrayList<>();

            // wait for the results
            for (Future f : futures) {
                results.add(f.get());
            }
            return results;
        } finally {
            exec.shutdownNow();
        }
    }

}

Tuesday, May 5, 2015

Parsing Line-Oriented Text Files Using Go

The following example demonstrates several features of Golang, such as reading a file line-by-line (with error handling), deferred statements and higher order functions.
package main

import (
 "bufio"
 "fmt"
 "os"
)

func ParseLines(filePath string, parse func(string) (string,bool)) ([]string, error) {
  inputFile, err := os.Open(filePath)
  if err != nil {
    return nil, err
  }
  defer inputFile.Close()

  scanner := bufio.NewScanner(inputFile)
  var results []string
  for scanner.Scan() {
    if output, add := parse(scanner.Text()); add {
      results = append(results, output)
    }
  }
  if err := scanner.Err(); err != nil {
    return nil, err
  }
  return results, nil
}

func main() {
  if len(os.Args) != 2 {
    fmt.Println("Usage: line_parser ")
    return
  }

  lines, err := ParseLines(os.Args[1], func(s string)(string,bool){ 
    return s, true
  })
  if err != nil {
    fmt.Println("Error while parsing file", err)
    return
  }

  for _, l := range lines {
    fmt.Println(l)
  }
}
The ParseLines function takes a path (filePath) to an input file, and a function (parse) that will be applied on each line read from the input file. The parse function should return a [string,boolean] pair, where the boolean value indicates whether the string should be added to the final result of ParseLines or not. The example shows how to simply read and print all the lines of the input file.
The caller can inject more sophisticated transformation and filtering logic into ParseLines via the parse function. The following example invocation filters out all the strings that do not begin with the prefix "[valid]", and extracts the 3rd field from each line (assuming a simple whitespace separated line format).
lines, err := ParseLines(os.Args[1], func(s string)(string,bool){
   if strings.HasPrefix(s, "[valid] ") {
     return strings.Fields(s)[2], true
   }
   return s, false
})
A function like ParseLines is suitable for parsing small to moderately large files. However, if the input file is very large, ParseLines may cause some issues, since it accumulates the results in memory.