In my previous post, I created a specification for my Harvesters. Now it's time to actually implement them and let them conform to that spec.
This post is mostly about messaging between actors, the different choices in handling those messages, and the impact these choices have on the concurrency of your system.
First approach
It's always better to start simple, so let's have a look at the following implementation:
class Harvester(val name: String, val goods: String, val speed: Int, val capacity: Int) extends Actor { private var producer: Actor = _ var stored = 0 def connectTo(p: Actor): Harvester = { producer = p this } def act { loop { react { case Harvest if producer == null => { reply(Harvested(goods, 0)) } case Harvest => { producer !? Produce match { case Produced(produced, amount) => { stored += amount println("[%s] Harvested %d %s".format(name, amount, goods)) reply(Harvested(goods, amount)) } } } case Stop => exit() case msg => println("[%s] Ignoring %s".format(name, msg)) } } } }
A nice start. This code ensures my specification actually compiles, and it also passes the first three requirements. I expected most of the other requirements to fail, but there is one surprising failure: this code does not run with thousands of other Harvesters in parallel! Looking at the output of that test, I could see the following:
[p5] Producing 1 g [p6] Producing 1 g [h5] Harvested 1 g [p8] Producing 1 g [p1] Producing 1 g [h8] Harvested 1 g [p4] Producing 1 g [p7] Producing 1 g [h7] Harvested 1 g [h1] Harvested 1 g [h6] Harvested 1 g [h4] Harvested 1 g run with thousands of others Harvesters in parallel org.specs.specification.FailureExceptionWithResult: ...
That's right: there were only six actors running during the entire 2 seconds. Sad, and not very concurrent.
Actors, Messaging and Concurrency
The Scala library provides three operators to communicate with an Actor:
actor ! Message
– the message is sent and the sender continues its business.actor !? Message
- the message is sent, and the sender waits for the reply before continuing. While the sender is waiting for the response, it blocks its current thread.actor !! Message
- the message is sent and the sender obtains a Future that will eventually contain the response. However, when the Future is evaluated, the sender again blocks its current thread.
Another interesting fact is that Scala manages its Actors by putting them in a limited thread pool; this implies that if an actor blocks its current thread, it also starves the other actors using the same pool.
These two facts explain why it wasn't possible to run my thousands of Harvesters – each Harvester waits for its Producer's reply using !?
, thereby blocking its thread. In other words: I needed a different mechanism for my Harvest
case:
case Harvest => { val caller = sender producer ! Produce react { case Produced(produced, amount) => { stored += amount println("[%s] Harvested %d %s".format(name, amount, goods)) caller ! Harvested(goods, amount) } } }
This code fragment does the following:
- When a matching
Harvest
arrives: - The implicit
sender
, which has been automatically set by the Actor framework, is set aside incaller
. - A
Produce
message is sent to the associatedproducer
, using!
. This is a non-blocking call, and our actor continues by starting a new non-blockingreact
block. - When the
Produced
message finally arrives, the incoming goods are stored, and aHarvested
message is sent to the originalcaller
.
A small change, but this was enough. I was now able to have 10.000 Producers and 10.000 Harvesters concurrently sending messages around within the timespan of 1 second. I did my victory dance and went on until the entire HarvesterSpec
was met.
Note how I had to take apart reply(Harvested(...))
into remembering the original sender and actually sending the message. Had I used reply
, the Harvested
message would be sent to the Producer
instead of the spec.
Lessons learned
During this step, I gained a deeper insight in the intricacies of Scala Actors and concurrency. I learned a lot about the semantics of the different types of message-sending and their impact.
Final words
OK, I cheated. If you take the exact Spec from my previous post and run it, you will not pass the "run with thousands of other Harvesters in parallel" requirement. Instead, Futures.awaitAll
will either throw a java.lang.OutOfMemoryError
exception, or a java.lang.StackOverflowError
.
Trying to make sense of the Futures
source code, it turns out that Futures.awaitAll
is recursive; this is fine for reasonable amounts of Future
objects, but it turns out that awaiting 10.000 such futures at the same time is not going to work. I therefore modified the source code of my original spec to work around this.
Get the code
As usual, the full code (including the Harvester-parts I did not describe) can be found at code.google.com.
No comments:
Post a Comment