Wednesday, May 13, 2015

Using Java Thread Pools

Here's a quick (and somewhat dirty) solution in Java to process a set of tasks in parallel. It does not require any third party libraries. Users can specify the tasks to be executed by implementing the Task interface. Then, a collection of Task instances can be passed to the TaskFarm.processInParallel method. This method will farm out the tasks to a thread pool and wait for them to finish. When all tasks have finished, it will gather their outputs, put them in another collection, and return it as the final outcome of the method invocation.
This solution also provides some control over the number of threads that will be employed to process the tasks. If a positive value is provided as the max argument, it will use a fixed thread pool with an unbounded queue to ensure that no more than 'max' tasks will be executed in parallel at any time. By specifying a non-positive value for the max argument, the caller can request the TaskFarm to use as many threads as needed.
If any of the Task instances throw an exception, the processInParallel method will also throw an exception.
package edu.ucsb.cs.eager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class TaskFarm<T> {

    /**
     * Process a collection of tasks in parallel. Wait for all tasks to finish, and then
     * return all the results as a collection.
     *
     * @param tasks The collection of tasks to be processed
     * @param max Maximum number of parallel threads to employ (non-positive values
     *            indicate no upper limit on the thread count)
     * @return A collection of results
     * @throws Exception If at least one of the tasks fail to complete normally
     */
    public Collection<T> processInParallel(Collection<Task<T>> tasks, int max) throws Exception {
        ExecutorService exec;
        if (max <= 0) {
            exec = Executors.newCachedThreadPool();
        } else {
            exec = Executors.newFixedThreadPool(max);
        }

        try {
            List<Future<T>> futures = new ArrayList<>();

            // farm it out...
            for (Task t : tasks) {
                final Task task = t;
                Future f = exec.submit(new Callable<T>() {
                    @Override
                    public T call() throws Exception {
                        return task.process();
                    }
                });
                futures.add(f);
            }

            List<T> results = new ArrayList<>();

            // wait for the results
            for (Future f : futures) {
                results.add(f.get());
            }
            return results;
        } finally {
            exec.shutdownNow();
        }
    }

}

Tuesday, May 5, 2015

Parsing Line-Oriented Text Files Using Go

The following example demonstrates several features of Golang, such as reading a file line-by-line (with error handling), deferred statements and higher order functions.
package main

import (
 "bufio"
 "fmt"
 "os"
)

func ParseLines(filePath string, parse func(string) (string,bool)) ([]string, error) {
  inputFile, err := os.Open(filePath)
  if err != nil {
    return nil, err
  }
  defer inputFile.Close()

  scanner := bufio.NewScanner(inputFile)
  var results []string
  for scanner.Scan() {
    if output, add := parse(scanner.Text()); add {
      results = append(results, output)
    }
  }
  if err := scanner.Err(); err != nil {
    return nil, err
  }
  return results, nil
}

func main() {
  if len(os.Args) != 2 {
    fmt.Println("Usage: line_parser ")
    return
  }

  lines, err := ParseLines(os.Args[1], func(s string)(string,bool){ 
    return s, true
  })
  if err != nil {
    fmt.Println("Error while parsing file", err)
    return
  }

  for _, l := range lines {
    fmt.Println(l)
  }
}
The ParseLines function takes a path (filePath) to an input file, and a function (parse) that will be applied on each line read from the input file. The parse function should return a [string,boolean] pair, where the boolean value indicates whether the string should be added to the final result of ParseLines or not. The example shows how to simply read and print all the lines of the input file.
The caller can inject more sophisticated transformation and filtering logic into ParseLines via the parse function. The following example invocation filters out all the strings that do not begin with the prefix "[valid]", and extracts the 3rd field from each line (assuming a simple whitespace separated line format).
lines, err := ParseLines(os.Args[1], func(s string)(string,bool){
   if strings.HasPrefix(s, "[valid] ") {
     return strings.Fields(s)[2], true
   }
   return s, false
})
A function like ParseLines is suitable for parsing small to moderately large files. However, if the input file is very large, ParseLines may cause some issues, since it accumulates the results in memory.

Friday, March 20, 2015

QBETS: A Time Series Analysis and Forecasting Method

Today I’m going to share some details on an analytics technology I’ve been using for my research.
QBETS (Queue Bounds Estimation from Time Series) is a non-parametric time series analysis method. The basic idea behind QBETS is to analyze a time series, and predict the p-th percentile of it, where p is a user-specified parameter. QBETS learns from the existing data points in the input time series, and estimates a p-th percentile value such that the next data point in the time series has a 0.01p probability of being less than or equal to the estimated value.
For example, suppose we have the following input time series, and we wish to predict the 95th percentile of it:

A0, A1, A2, …., An

If QBETS predicts the value Q as the 95th percentile, we can say that An+1 (the next data point that will be added to the time series by the generating process), has a 95% chance of being less than or equal to Q.

P(An+1 <= Q) = 0.01p

Since QBETS cannot determine the percentile values exactly, but must estimate them, it uses another parameter c (0 < c < 1) as an upper confidence bound on the estimated values. That is, if QBETS was used to estimate the p-th percentile value of a time series with c upper confidence, it would have overestimated the p-th percentile with a probability of 1 – c. For instance, if c = 0.05, then QBETS will generate predictions that overestimate the true p-th percentile 95% of the time. We primarily use parameter c as a means of controlling how conservative we want QBETS to be, when predicting percentiles.
QBETS also supports a technique known as change point detection. To understand what this means, let’s look at the following input time series.

7, 8, 7, 7, 9, 8, 7, 7, 15, 15, 16, 14, 16, 17,15

Here we see a sudden shift in the values after the first 8 data points. The individual data point values have increased from the 7-9 range to 14-17 range. QBETS detects such change points in the time series, and takes action to discard the data points before the change point. This is necessary to make sure that the predictions are not influenced by old historical values that are no longer relevant in the time series generating process.
The paper that originally introduced QBETS, used it as a mechanism to predict the scheduling delays in batch queuing systems for supercomputers and other HPC systems. Over the years researchers have used QBETS with a wide range of datasets, and it has produced positive results in almost all the cases. Lately, I have been using QBETS as a means of predicting API response times, by analyzing historical API performance data. Again, the results have been quite promising.

To learn more about QBETS, go through the paper or contact the authors.

Sunday, January 11, 2015

Creating Eucalyptus Machine Images from a Running VM

I often use Eucalyptus private cloud platform for my research. And very often I need to start Linux VMs in Eucalyptus, and install a whole stack of software on them. This involves a lot of repetitive work, so in order to save time I prefer creating machine images (EMIs) from fully configured VMs. This post outlines the steps one should follow to create an EMI from a VM running in Eucalyptus (tested on Ubuntu Lucid and Precise VMs).

Step 1: SSH into the VM running in Eucalyptus, if you already haven't.

Step 2: Run euca-bundle-vol command to create an image file (snapshot) from the VM's root file system.
euca-bundle-vol -p root -d /mnt -s 10240
Here "-p" is the name you wish to give to the image file. "-s" is the size of the image in megabytes. In the above example, this is set to 10GB, which also happens to be the largest acceptable value for "-s" argument. "-d" is the directory in which the image file should be placed. Make sure this directory has enough free space to accommodate the image size specified in "-s". 
This command may take several minutes to execute. For a 10GB image, it may take around 3 to 8 minutes. When completed, check the contents of the directory specified in argument "-d". You will see an XML manifest file and a number of image part files in there.

Step 3: Upload the image file to the Eucalyptus cloud using the euca-upload-bundle command.
euca-upload-bundle -b my-test-image -m /mnt/root.manifest.xml
Here "-b" is the name of the bucket (in Walrus key-value store) to which the image file should be uploaded. You don't have to create the bucket beforehand. This command will create the bucket if it doesn't already exist. "-m" should point to the XML manifest file generated in the previous step.
This command requires certain environment variables to be exported (primarily access keys and certificate paths). The easiest way to do that is to copy your eucarc file and the associated keys into the VM and source the eucarc file into the environment.
This command also may take several minutes to complete. At the end, it will output a string of the form "bucket-name/manifest-file-name".

Step 4: Register the newly uploaded image file with Eucalyptus.
euca-register my-test-image/root.manifest.xml
The only parameter required here is the "bucket-name/manifest-file-name" string returned from the previous step. I've noticed that in some cases, running this command from the VM in Eucalyptus doesn't work (you will get an error saying 404 not found). In that case you can simply run the command from somewhere else -- somewhere outside the Eucalyptus cloud. If all goes well, the command will return with an EMI ID. At this point you can launch instances of your image using the euca-run-instances command.

Friday, January 2, 2015

Developing Web Services with Go

Golang facilitates implementing powerful web applications and services using a very small amount of code. It can be used to implement both HTML rendering webapps as well as XML/JSON rendering web APIs. In this post, I'm going to demonstrate how easy it is to implement a simple JSON-based web service using Go. We are going to implement a simple addition service, that takes two integers as the input, and returns their sum as the output.
package main

import (
        "encoding/json"
        "net/http"
)

type addReq struct {
        Arg1,Arg2 int
}

type addResp struct {
        Sum int
}

func addHandler(w http.ResponseWriter, r *http.Request) {
        decoder := json.NewDecoder(r.Body)
        var req addReq
        if err := decoder.Decode(&req); err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
 
        jsonString, err := json.Marshal(addResp{Sum: req.Arg1 + req.Arg2})
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write(jsonString)
}

func main() {
        http.HandleFunc("/add", addHandler)
        http.ListenAndServe(":8080", nil)
}
Lets review the code from top to bottom. First we need to import the JSON and HTTP packages into our code. The JSON package provides the functions for parsing and marshaling JSON messages. The HTTP package enables processing HTTP requests. Then we define two data types (addReq and addResp) to represent the incoming JSON request and the outgoing JSON response. Note how addReq contains two integers (Arg1, Arg2) for the two input values, and addResp contains only one integer (Sum) for holding the total.
Next we define what is called a HTTP handler function which implements the logic of our web service. This function simply parses the incoming request, and populates an instance of the addReq struct. Then it creates an instance of the addResp struct, and serializes it into JSON. The resulting JSON string is then written out using the http.ResponseWriter object.
Finally, we have a main function that ties everything together, and starts executing the web service. This main function, simply registers our HTTP handler with the "/add" URL context, and starts an HTTP server on port 8080. This means any requests sent to the "/add" URL will be dispatched to the addHandler function for processing.
That's all there's to it. You may compile and run the program to try it out. Use Curl as follows to send a test request.
curl -v -X POST -d '{"Arg1":5, "Arg2":4}' http://localhost:8080/add
You will get a JSON response back with the total.

Wednesday, December 3, 2014

Controlled Concurrency with Golang

Lately I have been doing a lot of programming in Golang. It is one of those languages which is somewhat difficult to fully grasp at the beginning. But a few hundred lines of code later, you feel like you cannot get enough of it -- very simple syntax, brilliant performance and very clean and precise API semantics. This language has got it all.
Concurrent programming is one area where Golang really excels at. The goroutines that make it trivial to start concurrent threads of execution, channels as a first-class programming construct and a plethora of built-in utilities and packages (e.g. sync) make the developer's life a lot easier. In this post I'm going to give a brief overview on how to instantiate new threads of execution in Golang. Lets start with a piece of sequential code:
for i := 0; i < len(array); i++ {
  doSomething(array[i])
}
Above code iterates over an array, and calls the function doSomething for each element in the array. But this code is sequential, which means doSomething(n) won't be called until doSomething(n-1) returns. Suppose you want to speed things up a little bit by running multiple invocations of doSomething in parallel (assuming it is safe to do so -- both control and data wise). In Golang this is all you have to do:
for i := 0; i < len(array); i++ {
  go doSomething(array[i])
}
The go keyword will start the doSomething function as a separate concurrent goroutine. But this code change causes an uncontrolled concurrency situation. In other words, the only thing that's limiting the number of parallel goroutines spawned by the program is the length of the array, which is not a good idea if the array has thousands of entries. Ideally, we need to put some kind of a fixed cap on how many goroutines are spawned by the loop. This can be easily achieved by using a channel with a fixed capacity.
c := make(chan bool, 8)
for i := 0; i < len(array); i++ {
  c <- true
  go func(index int){
    doSomething(index)
    <- c
  }(i)
}
We start by creating a channel that can hold at most 8 boolean values. Then inside the loop, whenever we spawn a goroutine, we first send a boolean value (true) into the channel. This operation will get blocked if the channel is already full (i.e it has 8 elements). Then in the goroutine, we remove an element from the channel before we return. This little trick makes sure that at most 8 parallel goroutines will be active in the program at any given time. If you need to change this limit, you simply have to change the max capacity of the channel. You can set this to a fixed number, or write some code to figure out the optimal value based on the number of CPU cores available in the system.