Saturday, September 11, 2010

Not one thread, but many threads to do my work

It's often the case where our code executes in a single thread of execution. Take for an example a web application running on a servlet container. Sure the container is multi-threaded, but as far as one request is concerned, it's one thread that executes all code in its path to complete the request.
This is the norm and is usually sufficient for most cases. Then there are certain use cases where once the application is put together (or during designing it, if one pays good attention to design that is) you feel that part of the logic is simply waiting for their turn of the thread execution although they are perfectly capable in running without any dependency.

Take for an example a search solution that searches multiple data sources in order to present the results to the end-user. Assume that the data sources are one or more relational databases, an index of some sort (think lucene) an external search service. The search solution is expected to take in a user search term and search across all of these data sources and provide an aggregated set of results. And that's exactly what the code would do. The code would search each data source at a time, and say perhaps wrap the results in a common value object and would aggregate them so that they are ready for the user. So in the typical single thread execution path, the thread would execute each search one at a time and finally run them through the aggregation. Concurrently speaking though, this seems a bit wasteful. Each search against a data source has no dependency to each other. The only element that's dependent in this execution path is the aggregation which needs all results for it to do its magic. This means if we run all searches concurrently and then pass them to the aggregator we have a good chance of saving some time.
First thing that would dawn upon about this approach is - multiple threads. We want the search to spawn multiple threads on which each of the data sources would be searched on, and pass the results to an aggregation logic. Hence achieving concurrent execution of searches of which benefits can be very noticeable.

Messing around with multiple threads however, is something most avoid doing due to inherent complexities. Thread pooling, thread termination, synchronisation, deadlocks, resource sharing are a few of those complexities that a developer focused on application development might tend to stay away. But with the inclusion of the java concurrent package in the JDK, things just got simpler. Thanks to Doug Lea, the java.util.concurrent package which includes out of the box thread pooling, queues, mutexes, efficient thread initialisation and destruction, efficient locking etc are basically a comprehensive set of building blocks for application developers to easily introduce concurrency into the application flow.

While the above use case is an ideal candidate for an application developer to adopt a concurrent execution model in the application, a simple framework at - https://code.google.com/p/concurrent-it/ is a simple usage of the API on how to achieve concurrency. This was put together to achieve concurrent tasks in various use cases in a recent project. The code is quite straight forward and a test provided along with this is doing nothing but computing a fibonaci series and factorials. The computations however are exhausted so that its heavy in response times.

Touching the gist of it, the main building blocks employed of the java.util.concurrent package are briefly discussed here.

ExecutorService:
An ExecutorService provides means of initiating, managing and shutting down threads. The jdk provides several implementations of this and a factory utility called Executors provides the necessary utility methods to create them. The API documentation of the different types of ExecutorService is a good point of references (as always). Typically you would gain access to an ExecutorService will be as follows;

ExecutorService execService = Executors.newCachedThreadPool(); 
This would give you an ExecutorService which has a cached thread pool.

Callable:
The Callable interface is similar to the Runnable interface (in the thread api). So implementors of this interface can run on a different thread. The differentiator from the Runnable is that Callable will return a value and may throw an exception (if there's one to be thrown ofcourse).

public class Processor implements Callable {
 
   // implementing the contracted methods
    public AsyncResult call() throws ConcurrentException {
        // implementaitons placed here.
    }
}

Like the run() method of the Runnable interface, the Callable interface has a call() method that implementors are contracted to implement. As per the example above, our implementation returns a AsyncResult type when call() is called. And the call() method is invoked by the ExecutorService. And this implementation can be thought of a task that you'd want to be executed in its own thread.

Putting all together:
Now we've got our task that we want to be invoked in a separate thread (The Callable implementation) and we've got an ExecutorService that dishes our threads and manages the thread execution. Now to kick it all off all we have to do is;

List  tasks = // build a new list of Processor objects
 
List results = execService.invokeAll(tasks);


The invokeAll(Collection) method takes care of submitting all tasks (Processor objects in our case) for execution across multiple threads concurrently. As mentioned above, the call() method is consulted to kick off the execution of each of those threads.

Future:
The next important thing about the invokeAll(Collection) method call is its return value - Future objects. A Future is an holder of the result for an asynchronous process. This is quite a powerful concept where we can now take control of result of multiple threads. A simple get() method provides the means retrieving the desired result. As listed above, the type of the result is what the call() method returns. The get() method however would block if necessary. Thus the result of each task is accessible and can be put to use there after.

Putting all together:
A quick look how all of these lines in respect to the main thread of execution and the multiple threads spawned by the ExecutorService

List  tasks = // build a new list of Processor objects // main thread
List results = execService.invokeAll(tasks); // main thread spawns multiple thread

for (Future<AsyncResult> result : results) {
    AysncResult res = result.get() // main thread. Would block if necessary
    // do something with the result 
}
That's it. Using these building blocks from the API we have a nice straight-forward way to invoke our work concurrently. While the usage of the API is simple, applying concurrency needs to be done carefully. It needs to be thought of while designing the application and testing in terms of load, stress for thread and resource utilisation is a must. Also once implemented, a run through a profiler for how each thread behaves should be in the list of things todo as well. Also worth noting is that the tooling support is not quite mature as yet for the concurrent programming model (i.e. debugging).

The example contains a test that demonstrates the usage of the simple framework and it's accessible here - https://code.google.com/p/concurrent-it/. The test in the example shows the perhaps the most easiest advantage on using the java.util.concurrent package, which is speed-up through parallelism.