June 11, 2013

MapReduce made simple with Akka

As you probably already know, Akka can make your life a lot easier when it comes to things like concurrency and scalability. A very typical use case for starting to use Akka is when you have a computation or algorithm of some sort that lends itself to parallelizing. Implementing that with just core Java is cumbersome, error prone and it will feel like you are reinventing the wheel every time you do it. Akka will, for example, enable you to take advantage of all the cores in your CPU, or maybe even the CPUs of several machines, without having to worry about the evils of things like threads, lock, semaphores etc., and not to mention the hassle of trying to distribute the computations.

This post is a quick tip on how you can use Akka's built-in functional features to solve a map-reduce-like problem in a very simple way. We will be using Java as the base for our examples and some basic knowledge of Akka is assumed.

So let say you have taken your problem and solved it by splitting it up into smaller units of work and placed that into some sort of "worker" actor. And then when you need to perform your work you simply create as many workers as deemed fit and let them do the work. The work will be done concurrently so you will be able to take advantage of all the cores of your CPU.

The next question you will have to concern yourself with is how to gather the results from all the individual workers. One solution that you might think of is to have your "parent" actor (the one responsible for creating the workers) start all the workers and then letting your workers report back the result by sending the parent a message. This would involve having the parent keeping track of when all workers have reported back and then send back the total result to the actor who sent the original request to the parent. That, in turn, also means that the parent actor needs to keep a reference to the sender of the original request in order to report back the total result. (and doing so without falling in the trap of shared mutable state)

Using the previously described scenario you would typically send a message to your workers from your parent actor by doing something like this:
originalCaller = getSender();  //Needed to send back the total result
worker.tell(new Begin(), getSelf());
and then you would wait for the workers to report back by having this in your onReceive() method:
if (Result.class.isInstance(message)) {
  if (allWorkersHasReportedBack()) {
This is not as complicated as it sounds and it can be made to be a decent and workable solution but there is a more simple way of solving this problem by using Akka's Futures and the functional features that Akka provides with it. You can read all about these features here but lets look at a hands-on example.

The functional approach

Instead of just sending a message to our workers we will ask them for a result.
Future<Object> future = ask(worker, new Begin(), 1000);
This will give us a Future that we can use to get our partial result from. So if we ask all our workers we can then use the returned futures to get all the partial results. And lastly, we can combine the partial results into a total result and then report back to the original caller.

This is where the method fold() comes into the picture. If you are familiar with Scala then this functionality will not be anything new to you but this is one handy method that is available in Akka and it can really make your life easier if you learn it.

Taken from the documentation, fold "takes a start-value, a sequence of Future:s and a function from the type of the start-value, a timeout, and the type of the futures and returns something with the same type as the start-value, and then applies the function to all elements in the sequence of futures, non-blockingly, the execution will be started when the last of the Futures is completed."

Translated to our example, this means that the function we will provide to fold() will be responsible for combining the partial results into a total result. In code this would look like this:
private Result totalResult(List<Future<Object>> answers) throws Exception {
  final Future<Result> totalResult = fold(new Result(0), answers, new Function2<Result, Object, Result>() {
    public Result apply(Result previousResult, Object workerResponse) throws Exception {
      return new Result(previousResult.result + (Integer) workerResponse);
  }, getContext().dispatcher());

  return Await.result(totalResult, Duration.create(2, TimeUnit.SECONDS));
(In our example we are just summing up integers but it serves to illustrate the concept of combining partial results into a total result)

So, if we implement this then what is left in our parent's onReceive() would be this:
if (Begin.class.isInstance(message)) {
  final List<Future<Object>> answers = askWorkers();
  getSender().tell(totalResult(answers), getSelf());
This has effectively eliminated the need for you to keep track of the partial results, the original caller, and all the code that goes with it.

fold() also comes in a version called reduce(), with the difference being that reduce() does not take a start-value. Instead, it will use the first completing future as the start value.

Akka's futures, being monads, offers an array of methods that enables you to do some pretty powerful stuff right out of the box. Very often they can help simplifying the solutions and just make your life easier in general so you should definitely check them out.

Sample code for the example used in this blog post can be found as a Gist here.

No comments:

Post a Comment