5

in Scala 2.8 when I start actors, I can communicate via message passing. This in turn means that I can send the ultimate Exit() message or whatever I decide fits my protocol.

But how will I check if an actor has exited? I can easily imagine myself having a task where a master actor starts some worker actors and then simply waits for the answers, each time checking if this was the final answer (i.e. are any Actors still working or did they all exit?).

Of course I can let them all send back an "I'm done" message, and then count them, but this is somehow unsatisfactory.

What is best practise when testing for the completion of worker-actors?

EDIT#1

I am looking into Futures, but having trouble. Can someone explain why this code doesn't work:

package test
import scala.actors.Futures._

object FibFut extends Application{
    
    def fib(i:Int):Int = 
        if(i<2)
            1
        else
            fib(i-1)+fib(i-2)
            
    val f = future{ fib(3) }
    
    println(f())    
        
}

It works if I define the function fib inside the future-body. It must be a scope thing, but I don't get any errors with the above, it simply hangs. Anyone?

EDIT#2

It seems that extending Application wasn't a nice way to go. Defining a main method made everything work. The below code is what I was looking for, so Futures get the thumbs up :)

package test

import scala.actors.Futures._

object FibFut {

  def fib(i: Int): Int = if (i < 2) 1 else fib(i - 1) + fib(i - 2)

  def main(args: Array[String]) {

    val fibs = for (i <- 0 to 50) yield future { fib(i) }

    for (future <- fibs) println(future())

  }

}
1
  • 3
    Uhm, this thread was interesting to me: stackoverflow.com/questions/2721337/… , along with using this trick on a list of actors: computers.map(.getState==Actor.State.Terminated).reduceRight(_&&),to check if all actors inside the computers list have terminated. If that is the case, the master thread can go into a "finish reading mailbox and exit" state with the use if reactwithin. I will post the solution if I get it done :-)
    – Felix
    Nov 29, 2010 at 13:53

3 Answers 3

3

I'm a fan of "I'm done" messages, personally; it's a good way to manage distribution of work, and as a bonus, you already know when all children have finished what they're doing.

But if you really just want to farm out some work once and wait until everything is ready, check out scala.actors.Futures. You can ask it to do some computation:

val futureA = Futures.future {
  val a = veryExpensiveOperation
  (a,"I'm from the future!")
}

and then you can wait for everything to complete, if you have made multiple requests:

Futures.awaitAll(600*1000, futureA, futureB, futureC, futureD)
// Returns once all of A-D have been computed
val actualA = futureA()   // Now we get the value
8
  • Busy-waiting in the master? :( Nov 28, 2010 at 22:59
  • Okay, futures are looking very interesting. I see this function in the Future-class: def isSet : Boolean, which is probably what I need. Is there a tutorial/example on how to create/manage your own future objects without using the Futures-companion-object?
    – Felix
    Nov 29, 2010 at 8:58
  • @Viktor - In the "I'm done" model, the master reacts to the "I'm done" messages of the workers. In the "do it once" model, the master is only a handy way to encapsulate multiple futures in some sensible order. I don't see where busy-waiting would arise.
    – Rex Kerr
    Nov 29, 2010 at 13:28
  • @Felix - Calling isSet yourself is usually a bad idea, since you have to poll for the result to complete. What's wrong with using the Futures companion object, and what's wrong with using built in methods (or blocking on apply) to wait for children?
    – Rex Kerr
    Nov 29, 2010 at 13:31
  • @Viktor - I had assumed it blocks by sleeping and getting interrupted, not by busy-waiting. I haven't actually checked. Is it really busy-waiting?
    – Rex Kerr
    Nov 30, 2010 at 16:10
2

A while ago I wrote a post on linking actors in Scala. Actor linking is an idiomatic [and the easiest] way to monitor actors in Erlang, Scala Actors and other actor libraries. By defalt, when you link 2 actors, and one of them dies, another immediately dies too (unless the actor traps/handles exit signal):

scala> case object Stop
defined module Stop

scala>

scala> val actor1 = actor {
     |    loop {
     |       react {
     |          case Stop =>
     |             println("Actor 1: stop")
     |             exit()
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor1: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala>

scala> val actor2 = actor {
     |    link(actor1)
     |    self.trapExit = true
     |    loop {
     |       react {
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor2: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1.start
res12: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala> actor2.start
res13: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1 ! Stop
Actor 1: stop

scala> Exit(scala.actors.Actor$$anon$1@1feea62,'normal)  // Actor 2 received message, when Actor1 died

A more sophisticated and flexible way is using supervisors (supervisor behavior in Erlang, actor supervisors in Akka Actors library, etc). A supervisor (being itself an actor) monitors a number of other actors, and restarts them with regards to a specified strategy (restart all actors, if one dies; restart just one actor, when it dies).

5
  • linking is far from perfect. Firstly, I believe that if you attempt to link to an actor which has already exited, you will not receive notification. Secondly, linking to an actor which falls over before it enters any kind of react/receive loop also misses any callbacks. Nov 28, 2010 at 17:45
  • Linking is idiomatic. In the both cases you mentioned, before linking it makes sense to call getState, and make sure that actor's state is different from 'terminated'. Nov 28, 2010 at 17:55
  • Vasil, can you elaborate on getState? If I can check to see if an actor has terminated, that will also be a good solution for me. edit Okay, I looked at this link: scala-lang.org/api/current/index.html , and I see theres an enumerate. I'm guessing randomActorObject.getState==State.Terminated would be the check I am looking for.
    – Felix
    Nov 29, 2010 at 9:01
  • During the lifetime, actor can be in one of the following 7 states: New, Runnable, Suspended, TimedSuspended, Blocked, TimedBlocked, Terminated (values of State enumeration). You can call anytime getState: Actor.State.Value on the actor instance to find out, what's the current state of an actor. Nov 29, 2010 at 9:06
  • I really don't know which of the two answers I should accept. They both helped me a great deal! Futures does however seem to be doing essentially what I need.
    – Felix
    Nov 30, 2010 at 7:47
0

Okay everyone, i have come up with a solution using the getState function of the actor class. In the solution I used an idea from this thread: Best method to peek into a Scala Actor's Mailbox in which reactWithin(0) is used. I ran into trouble when using react and loop where the program would simply block on big calculations. This was solved by replacing loop with while(true) and reactWithin(int) with receiveWithin(int).

My solution looks as follows (beware, bigass code-lump):

package test

import scala.actors._
import scala.actors.Actor.State._

case class Computation(index: Int, a: () ⇒ Int)
case class Result(i: String)
object Main {
  def main(args: Array[String]) {
    val m = new Master
    m.start
  }
}

class Master extends Actor {

  val N = 40
  var numberOfAnswers = 0

  def fib(x: Int): Int =
    if (x < 2)
      1
    else
      fib(x - 1) + fib(x - 2)

  val computers = for (i ← 0 to N) yield new Computer

  def act {

    for (i ← 0 until computers.size) {
      computers(i).start
      computers(i) ! Computation(i, () => fib(i))
    }

    println("done Initializing actors")
    while (true) {
      receiveWithin(1000) {

        case Result(i) =>
          val numberDone = computers.map(_.getState == Terminated).filter(_ == true).length
          println(i)
          numberOfAnswers += 1

        case TIMEOUT =>
          val allDone = computers.map(_.getState == Terminated).reduceRight(_ && _)
          println("All workers done?:" + allDone)
          println("# of answers:" + numberOfAnswers)
          if (allDone)
            exit()
      }
    }

  }

}

class Computer extends Actor {

  def act {
    loop {
      react {
        case Computation(i, f) ⇒
          sender ! Result("#" + i + " Res:" + f())
          exit()
      }
    }
  }

}

The program calculates the fibonacci numbers (in the worst possible way). The idea is simply to test utilization of multiple threads for big workloads. The following line checks whether some actor has yet to terminate:

computers.map(_.getState == Terminated).reduceRight(_ && _)

where computers is of the type IndexedSeq[Computer]. The trick is that using the TIMEOUT message, I can periodically check if all the work is done and act accordingly (in this case, exit when there are no more active workers). I take advantage of the fact that each worker sends the results before they exit. In that way I know that I will always receive the results and handle them before they will be shown as Terminated.

Can someone comment on the fact that the program "locks up" (stops receiving messages) when I use react and loop instead of while(true) and receive?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.