Friday, September 3, 2010

Creating a simple simulation - Harvesters

In my previous post, I created a specification for my Harvesters. Now it's time to actually implement them and let them conform to that spec.

This post is mostly about messaging between actors, the different choices in handling those messages, and the impact these choices have on the concurrency of your system.

First approach

It's always better to start simple, so let's have a look at the following implementation:

class Harvester(val name: String,
                val goods: String,
                val speed: Int,
                val capacity: Int) extends Actor {

  private var producer: Actor = _
  var stored = 0

  def connectTo(p: Actor): Harvester = {
    producer = p
    this
  }

  def act {
    loop {
      react {
        case Harvest if producer == null => {
          reply(Harvested(goods, 0))
        }
        case Harvest => {
          producer !? Produce match {
            case Produced(produced, amount) => {
              stored += amount
              println("[%s] Harvested %d %s".format(name, amount, goods))
              reply(Harvested(goods, amount))
            }
          }
        }
        case Stop => exit()
        case msg => println("[%s] Ignoring %s".format(name, msg))
      }
    }
  }
}

A nice start. This code ensures my specification actually compiles, and it also passes the first three requirements. I expected most of the other requirements to fail, but there is one surprising failure: this code does not run with thousands of other Harvesters in parallel! Looking at the output of that test, I could see the following:

[p5] Producing 1 g
[p6] Producing 1 g
[h5] Harvested 1 g
[p8] Producing 1 g
[p1] Producing 1 g
[h8] Harvested 1 g
[p4] Producing 1 g
[p7] Producing 1 g
[h7] Harvested 1 g
[h1] Harvested 1 g
[h6] Harvested 1 g
[h4] Harvested 1 g

run with thousands of others Harvesters in parallel
org.specs.specification.FailureExceptionWithResult: ...

That's right: there were only six actors running during the entire 2 seconds. Sad, and not very concurrent.

Actors, Messaging and Concurrency

The Scala library provides three operators to communicate with an Actor:

  • actor ! Message – the message is sent and the sender continues its business.
  • actor !? Message - the message is sent, and the sender waits for the reply before continuing. While the sender is waiting for the response, it blocks its current thread.
  • actor !! Message - the message is sent and the sender obtains a Future that will eventually contain the response. However, when the Future is evaluated, the sender again blocks its current thread.

Another interesting fact is that Scala manages its Actors by putting them in a limited thread pool; this implies that if an actor blocks its current thread, it also starves the other actors using the same pool.

These two facts explain why it wasn't possible to run my thousands of Harvesters – each Harvester waits for its Producer's reply using !?, thereby blocking its thread. In other words: I needed a different mechanism for my Harvest case:

case Harvest => {
  val caller = sender
  producer ! Produce
  react {
    case Produced(produced, amount) => {
      stored += amount
      println("[%s] Harvested %d %s".format(name, amount, goods))
      caller ! Harvested(goods, amount)
    }
  }
}

This code fragment does the following:

  1. When a matching Harvest arrives:
  2. The implicit sender, which has been automatically set by the Actor framework, is set aside in caller.
  3. A Produce message is sent to the associated producer, using !. This is a non-blocking call, and our actor continues by starting a new non-blocking react block.
  4. When the Produced message finally arrives, the incoming goods are stored, and a Harvested message is sent to the original caller.

A small change, but this was enough. I was now able to have 10.000 Producers and 10.000 Harvesters concurrently sending messages around within the timespan of 1 second. I did my victory dance and went on until the entire HarvesterSpec was met.

Note how I had to take apart reply(Harvested(...)) into remembering the original sender and actually sending the message. Had I used reply, the Harvested message would be sent to the Producer instead of the spec.

Lessons learned

During this step, I gained a deeper insight in the intricacies of Scala Actors and concurrency. I learned a lot about the semantics of the different types of message-sending and their impact.

Final words

OK, I cheated. If you take the exact Spec from my previous post and run it, you will not pass the "run with thousands of other Harvesters in parallel" requirement. Instead, Futures.awaitAll will either throw a java.lang.OutOfMemoryError exception, or a java.lang.StackOverflowError.

Trying to make sense of the Futures source code, it turns out that Futures.awaitAll is recursive; this is fine for reasonable amounts of Future objects, but it turns out that awaiting 10.000 such futures at the same time is not going to work. I therefore modified the source code of my original spec to work around this.

Get the code

As usual, the full code (including the Harvester-parts I did not describe) can be found at code.google.com.

No comments:

Post a Comment