Code Examples for

Actors in Scala

Return to chapter index

10 Akka Actors

  • 10.1 Creating Akka actors
  • 10.2 ActorRefs
  • 10.3 Inter-actor interaction, interactively
  • 10.4 Message handling
  • 10.5 Remote actors in Akka
  • 10.1 Creating Akka actors


    import akka.actor.{Actor, ActorRef, Channel} import Actor._ class ChainActor(next: Option[ActorRef]) extends Actor { var from: Channel[Any] = _ def receive = { case 'Die => from = self.channel if (next.isEmpty) { from ! 'Ack self.stop() } else next.get ! 'Die case 'Ack => self.stop() from ! 'Ack } }
    def buildChain(size: Int, next: Option[ActorRef]): ActorRef = { val a = actorOf(new ChainActor(next)) a.start() if (size > 1) buildChain(size - 1, Some(a)) else a }

    10.2 ActorRefs


    val chainActor = new ChainActor(next) // Error! val ref = actorOf(chainActor)
    akka.actor.ActorInitializationException: ActorRef for instance of actor [examples.akka.ChainActor] is not in scope. You can not create an instance of an actor explicitly using 'new MyActor'. You have to use one of the factory methods in the 'Actor' object to create a new actor. Either use: 'val actor = Actor.actorOf[MyActor]', or 'val actor = Actor.actorOf(new MyActor(..))'
    var leaked: ChainActor = null val ref = actorOf({leaked = new ChainActor(next); leaked})

    10.3 Inter-actor interaction, interactively


    class MasterActor(n: Int) extends Actor { val first = buildChain(n, None) def buildChain(size: Int, next: Option[ActorRef]): ActorRef = ... def receive = { case 'Start => first ! 'Die case 'Ack => println("OK, all actors died") self.stop() } }
    scala> import akka.actor.Actor._ import akka.actor.Actor._ scala> val master = actorOf(new MasterActor(5)) AKKA_HOME is defined as [.../akka-actors-1.1.2], loading con fig from [.../akka-actors-1.1.2/config/akka.conf]. master: akka.actor.ActorRef = Actor[MasterActor:0291...a5d2] scala> master.start() res0: akka.actor.ActorRef = Actor[MasterActor:0291d...4a5d2] scala> master ! 'Start OK, all actors died

    10.4 Message handling


    def receive = { case InitData(item) => // add item to data set case StartProcessing(condition) => // process data set until `condition` is true }
    def receive = { case InitData(item) => // add item to data set case StartProcessing(condition) => // process data set until `condition` is true // instruct workers to process their part of // the data set for (worker <- workers) { worker ! Process(part) } become { case WorkerDone => // the worker that sent this message is done ... } }
    become { case WorkerDone => // the worker that sent this message is done ... if (condition()) become(receive) }
    case class Put(elem: Int) case object Get case object Stop class SimpleBuffer extends Actor { def receive = { case Put(data) => become { case Get => self.reply(data) become(receive) } case Stop => self.stop() } }
    class Consumer(buf: ActorRef) extends Actor { def receive = { case Start => // blocks actor until response is received println(buf ? Get) println(buf ? Get) buf ! Stop self.stop() } } object SimpleBuffer { def main(args: Array[String]) { val buffer = actorOf(new SimpleBuffer) buffer.start() buffer ! Put(5) buffer ! Put(3) val consumer = actorOf(new Consumer(buffer)) consumer.start() consumer ! Start } }
    Some(5) [ERROR] [7/8/11 12:41 PM] [akka:event-driven:dispatcher:global-1] [LocalActorRef] Put(3) akka.actor.UnhandledMessageException: Actor Actor[examples.akka.SimpleBuffer:40...d2] does not handle [Put(3)] [ERROR] [7/8/11 12:41 PM] [akka:event-driven:dispatcher:global-4] [LocalActorRef] Get akka.actor.UnhandledMessageException: Actor Actor[examples.akka.SimpleBuffer:40...d2] does not handle [Get] [ERROR] [7/8/11 12:41 PM] [akka:event-driven:dispatcher:global-2] [LocalActorRef] Start akka.actor.UnhandledMessageException: Actor Actor[examples.akka.SimpleBuffer:40...d2] does not handle [Get]
    class FlatBuffer extends Actor { var elems = Queue[Int]() var consumers = Queue[Channel[Any]]() def receive = { case Put(data) => if (consumers.isEmpty) { elems = elems enqueue data } else { val (from, rest) = consumers.dequeue consumers = rest from ! data } case Get => if (elems.isEmpty) { consumers = consumers enqueue self.channel } else { val (data, rest) = elems.dequeue elems = rest self.channel ! data } case Stop => self.stop() } }

    10.5 Remote actors in Akka


    import akka.actor.{Actor, ActorRef} import Actor._ import java.util.concurrent.CountDownLatch object MasterService { val doneInit = new CountDownLatch(1) private var _master: ActorRef = _ def master: ActorRef = _master def main(args: Array[String]) { val hostname = args(0) val port = args(1).toInt val numNodes = args(2).toInt remote.start(hostname, port) _master = actorOf[MasterService].start() remote.register(_master) _master ? ClusterSize(numNodes) doneInit.await() } }
    class MasterService extends Actor { var numNodes = 0 var nodeRefs: Map[(String, Int), ActorRef] = Map() def receive = { case ClusterSize(numNodes) => this.numNodes = numNodes println("[Master] waiting for " + numNodes + " nodes to register") self.reply() case Announce(newHost, newPort) => println("[Master] new host " + newHost + ":" + newPort) val nodeRef = remote.actorFor( classOf[ClusterService].getCanonicalName, newHost, newPort) nodeRefs += ((newHost, newPort) -> nodeRef) if (nodeRefs.size == numNodes) { println("[Master] all nodes have registered") nodeRefs.values foreach { service => service ? Nodes(nodeRefs.keys.toList) } MasterService.doneInit.countDown() } ... } }
    object ClusterService { val terminate = new CountDownLatch(1) def run(masterHostname: String, masterPort: Int, hostname: String, port: Int) { remote.start(hostname, port) val service = actorOf[ClusterService].start() remote.register(service) service ! Announce(masterHostname, masterPort) terminate.await() registry.shutdownAll() // also stops service actor remote.shutdown() } def main(args: Array[String]) { val masterHostname = args(0) val masterPort = args(1).toInt val hostname = args(2) val port = args(3).toInt run(masterHostname, masterPort, hostname, port) } }
    class ClusterService extends Actor { var allAddresses: List[(String, Int)] = List() var master: ActorRef = null def receive = { case Announce(hostname, port) => master = remote.actorFor( classOf[MasterService].getCanonicalName, hostname, port) val localhost = remote.address.getHostName() val localport = remote.address.getPort() master ! Announce(localhost, localport) case Nodes(addresses) => println("[ClusterService] received node addresses: " + addresses) allAddresses = addresses self.reply() case StartActorAt(_, _, clazz) => println("[ClusterService] starting instance of " + clazz) val newActor = actorOf(clazz).start() remote.register(newActor) newActor ? Nodes(allAddresses) self.reply() case StopServiceAt(_, _) => println("[ClusterService] shutting down...") ClusterService.terminate.countDown() } }
    case startMsg @ StartActorAt(host, port, clazz) => nodeRefs((host, port)) ? startMsg val startedActor = remote.actorFor( clazz.getCanonicalName, host, port) self.reply(startedActor)
    class EchoActor extends Actor { var neighbors: List[ActorRef] = List() var allAddresses: List[(String, Int)] = List() var sum = 0 def receive = { case Nodes(addresses) => allAddresses = addresses neighbors = addresses map { case (hostname, port) => remote.actorFor(classOf[ClusterService].getCanonicalName, hostname, port) } self.reply() case any: String => println("[EchoActor] received " + any) // try converting to an Int sum += any.toInt println("[EchoActor] current sum: " + sum) } }
    // initialize MasterService MasterService.main(Array("localhost", "9000", "1")) // remotely start EchoActor val response = MasterService.master ? StartActorAt("localhost", 9001, classOf[EchoActor]) val echoActor = response.get.asInstanceOf[ActorRef] echoActor ! "17"
    MasterService.master ? StopServiceAt("localhost", 9001) MasterService.shutdown()
    [ERROR] [8/5/11 3:29 PM] [akka:event-driven:dispatcher:global-5] [LocalActorRef] hello java.lang.NumberFormatException: For input string: "hello" at java.lang.NumberFormatException.forInputString(NumberFor matException.java:48) at java.lang.Integer.parseInt(Integer.java:449) at java.lang.Integer.parseInt(Integer.java:499)
    val newActor = actorOf(clazz) // start newActor and link to ClusterService self.startLink(newActor) remote.register(newActor) ...
    self.faultHandler = OneForOneStrategy(List(classOf[NumberFormatException], classOf[RuntimeException]), 5, 5000)
    self.lifeCycle = Permanent
    // initialize MasterService MasterService.main(Array("localhost", "9000", "1")) // remotely start EchoActor val response = MasterService.master ? StartActorAt("localhost", 9001, classOf[EchoActor]) val echoActor = response.get.asInstanceOf[ActorRef] // this will lead to an exception in echoActor echoActor ! "hello" // try again; echoActor is restarted automatically echoActor ! "17"

    For more information about Actors in Scala, please visit:

    http://www.artima.com/shop/actors_in_scala

    and:

    http://booksites.artima.com/actors_in_scala

    Copyright © 2011 Artima, Inc. All rights reserved.

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.