Code Examples for

Programming in Scala

Return to chapter index

30 Actors and Concurrency

Sample run of chapter's interpreter examples

30.1 Trouble in paradise

30.2 Actors and message passing


// In file concurrency/Actors.scala import scala.actors._ object SillyActor extends Actor { def act() { for (i <- 1 to 5) { println("I'm acting!") Thread.sleep(1000) } } }
scala> SillyActor.start() I'm acting! res4: scala.actors.Actor = SillyActor$@1945696 scala> I'm acting! I'm acting! I'm acting! I'm acting!
scala> Thread.sleep(5000) asdf
// In file concurrency/Actors.scala import scala.actors._ object SeriousActor extends Actor { def act() { for (i <- 1 to 5) { println("To be or not to be.") Thread.sleep(1000) } } }
scala> SillyActor.start(); SeriousActor.start() res3: scala.actors.Actor = seriousActor$@1689405 scala> To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting! To be or not to be. I'm acting!
scala> Thread.sleep(5000) asdf
scala> import scala.actors.Actor._ scala> val seriousActor2 = actor { | for (i <- 1 to 5) | println("That is the question.") | Thread.sleep(1000) | } scala> That is the question. That is the question. That is the question. That is the question. That is the question.
scala> SillyActor ! "hi there"
// In file concurrency/Actors.scala val echoActor = actor { while (true) { receive { case msg => println("received message: "+ msg) } } }
scala> echoActor ! "hi there" received message: hi there scala> echoActor ! 15 scala> received message: 15
scala> val intActor = actor { | receive { | case x: Int => // I only want Ints | println("Got an Int: "+ x) | } | } intActor: scala.actors.Actor = scala.actors.Actor$$anon$1@34ba6b
scala> intActor ! "hello" scala> intActor ! Math.Pi
scala> intActor ! 12 Got an Int: 12

30.3 Treating native threads as actors


scala> import scala.actors.Actor._ import scala.actors.Actor._ scala> self ! "hello" scala> self.receive { case x => x } res6: Any = hello
scala> self.receiveWithin(1000) { case x => x } // wait a sec! res7: Any = TIMEOUT

30.4 Better performance through thread reuse


// In file concurrency/Actors.scala object NameResolver extends Actor { import java.net.{InetAddress, UnknownHostException} def act() { react { case (name: String, actor: Actor) => actor ! getIp(name) act() case "EXIT" => println("Name resolver exiting.") // quit case msg => println("Unhandled message: "+ msg) act() } } def getIp(name: String): Option[InetAddress] = { try { Some(InetAddress.getByName(name)) } catch { case _:UnknownHostException => None } } }
scala> NameResolver.start() res0: scala.actors.Actor = NameResolver$@90d6c5 scala> NameResolver ! ("www.scala-lang.org", self) scala> self.receiveWithin(0) { case x => x } res2: Any = Some(www.scala-lang.org/128.178.154.102) scala> NameResolver ! ("wwwwww.scala-lang.org", self) scala> self.receiveWithin(0) { case x => x } res4: Any = None
// In file concurrency/Actors.scala def act() { loop { react { case (name: String, actor: Actor) => actor ! getIp(name) case msg => println("Unhandled message: " + msg) } } }

30.5 Good actors style


actor { Thread.sleep(time) mainActor ! "WAKEUP" }
// In file concurrency/Actors.scala val sillyActor2 = actor { def emoteLater() { val mainActor = self actor { Thread.sleep(1000) mainActor ! "Emote" } } var emoted = 0 emoteLater() loop { react { case "Emote" => println("I'm acting!") emoted += 1 if (emoted < 5) emoteLater() case msg => println("Received: "+ msg) } } }
scala> sillyActor2 ! "hi there" scala> Received: hi there I'm acting! I'm acting! I'm acting!
def act() { loop { react { case (name: String, actor: Actor) => actor ! (name, getIp(name)) } } }
lookerUpper ! ("www.scala-lang.org", self)
case class LookupIP(hostname: String, requester: Actor) lookerUpper ! LookupIP("www.scala-lang.org", self)
// In file concurrency/Actors.scala import scala.actors.Actor._ import java.net.{InetAddress, UnknownHostException} case class LookupIP(name: String, respondTo: Actor) case class LookupResult( name: String, address: Option[InetAddress] ) object NameResolver2 extends Actor { def act() { loop { react { case LookupIP(name, actor) => actor ! LookupResult(name, getIp(name)) } } } def getIp(name: String): Option[InetAddress] = { // As before (in Listing 30.3) } }

30.6 A longer example: Parallel discrete event simulation


trait Simulant extends Actor class Wire extends Simulant
// In file concurrency/ParallelSimulation.scala case class Ping(time: Int) case class Pong(time: Int, from: Actor)
class Clock extends Actor { private var running = false private var currentTime = 0 private var agenda: List[WorkItem] = List() }
// In file concurrency/ParallelSimulation.scala case class WorkItem(time: Int, msg: Any, target: Actor)
// In file concurrency/ParallelSimulation.scala case class AfterDelay(delay: Int, msg: Any, target: Actor)
// In file concurrency/ParallelSimulation.scala case object Start case object Stop
// In file concurrency/ParallelSimulation.scala class Clock extends Actor { private var running = false private var currentTime = 0 private var agenda: List[WorkItem] = List() private var allSimulants: List[Actor] = List() private var busySimulants: Set[Actor] = Set.empty
// In file concurrency/ParallelSimulation.scala start()
// In file concurrency/ParallelSimulation.scala def add(sim: Simulant) { allSimulants = sim :: allSimulants }
// In file concurrency/ParallelSimulation.scala def act() { loop { if (running && busySimulants.isEmpty) advance() reactToOneMessage() } }
// In file concurrency/ParallelSimulation.scala def advance() { if (agenda.isEmpty && currentTime > 0) { println("** Agenda empty. Clock exiting at time "+ currentTime+".") self ! Stop return } currentTime += 1 println("Advancing to time "+currentTime) processCurrentEvents() for (sim <- allSimulants) sim ! Ping(currentTime) busySimulants = Set.empty ++ allSimulants }
// In file concurrency/ParallelSimulation.scala private def processCurrentEvents() { val todoNow = agenda.takeWhile(_.time <= currentTime) agenda = agenda.drop(todoNow.length) for (WorkItem(time, msg, target) <- todoNow) { assert(time == currentTime) target ! msg } }
// In file concurrency/ParallelSimulation.scala def reactToOneMessage() { react { case AfterDelay(delay, msg, target) => val item = WorkItem(currentTime + delay, msg, target) agenda = insert(agenda, item) case Pong(time, sim) => assert(time == currentTime) assert(busySimulants contains sim) busySimulants -= sim case Start => running = true case Stop => for (sim <- allSimulants) sim ! Stop exit() } }
// In file concurrency/ParallelSimulation.scala def act() { loop { react { case Stop => exit() case Ping(time) => if (time == 1) simStarting() clock ! Pong(time, self) case msg => handleSimMessage(msg) } } }
// In file concurrency/ParallelSimulation.scala trait Simulant extends Actor { val clock: Clock def handleSimMessage(msg: Any) def simStarting() { } def act() { loop { react { case Stop => exit() case Ping(time) => if (time == 1) simStarting() clock ! Pong(time, self) case msg => handleSimMessage(msg) } } } start() }
// In file concurrency/Circuit.scala class Circuit { val clock = new Clock // simulation messages // delay constants // Wire and Gate classes and methods // misc. utility methods }
// In file concurrency/Circuit.scala case class SetSignal(sig: Boolean) case class SignalChanged(wire: Wire, sig: Boolean)
// In file concurrency/Circuit.scala val WireDelay = 1 val InverterDelay = 2 val OrGateDelay = 3 val AndGateDelay = 3
// In file concurrency/Circuit.scala class Wire(name: String, init: Boolean) extends Simulant { def this(name: String) { this(name, false) } def this() { this("unnamed") } val clock = Circuit.this.clock clock.add(this) private var sigVal = init private var observers: List[Actor] = List()
// In file concurrency/Circuit.scala def handleSimMessage(msg: Any) { msg match { case SetSignal(s) => if (s != sigVal) { sigVal = s signalObservers() } } } def signalObservers() { for (obs <- observers) clock ! AfterDelay( WireDelay, SignalChanged(this, sigVal), obs) }
// In file concurrency/Circuit.scala override def simStarting() { signalObservers() }
// In file concurrency/Circuit.scala def addObserver(obs: Actor) { observers = obs :: observers } override def toString = "Wire("+ name +")"
// In file concurrency/Circuit.scala private object DummyWire extends Wire("dummy")
// In file concurrency/Circuit.scala abstract class Gate(in1: Wire, in2: Wire, out: Wire) extends Simulant {
// In file concurrency/Circuit.scala def computeOutput(s1: Boolean, s2: Boolean): Boolean
// In file concurrency/Circuit.scala val delay: Int
// In file concurrency/Circuit.scala val clock = Circuit.this.clock clock.add(this)
// In file concurrency/Circuit.scala in1.addObserver(this) in2.addObserver(this)
// In file concurrency/Circuit.scala var s1, s2 = false
// In file concurrency/Circuit.scala def handleSimMessage(msg: Any) { msg match { case SignalChanged(w, sig) => if (w == in1) s1 = sig if (w == in2) s2 = sig clock ! AfterDelay(delay, SetSignal(computeOutput(s1, s2)), out) } }
// In file concurrency/Circuit.scala def orGate(in1: Wire, in2: Wire, output: Wire) = new Gate(in1, in2, output) { val delay = OrGateDelay def computeOutput(s1: Boolean, s2: Boolean) = s1 || s2 } def andGate(in1: Wire, in2: Wire, output: Wire) = new Gate(in1, in2, output) { val delay = AndGateDelay def computeOutput(s1: Boolean, s2: Boolean) = s1 && s2 }
// In file concurrency/Circuit.scala def inverter(input: Wire, output: Wire) = new Gate(input, DummyWire, output) { val delay = InverterDelay def computeOutput(s1: Boolean, ignored: Boolean) = !s1 }
// In file concurrency/Circuit.scala def probe(wire: Wire) = new Simulant { val clock = Circuit.this.clock clock.add(this) wire.addObserver(this) def handleSimMessage(msg: Any) { msg match { case SignalChanged(w, s) => println("signal "+ w +" changed to "+ s) } } }
// In file concurrency/Circuit.scala def start() { clock ! Start }
// In file concurrency/Adders.scala trait Adders extends Circuit { def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire) { val d, e = new Wire orGate(a, b, d) andGate(a, b, c) inverter(c, e) andGate(d, e, s) } def fullAdder(a: Wire, b: Wire, cin: Wire, sum: Wire, cout: Wire) { val s, c1, c2 = new Wire halfAdder(a, cin, s, c1) halfAdder(b, s, sum, c2) orGate(c1, c2, cout) } }
val circuit = new Circuit with Adders
val circuit = new Circuit with Adders with Multiplexers with FlipFlops with MultiCoreProcessors
// In file concurrency/Demo.scala object Demo { def main(args: Array[String]) { val circuit = new Circuit with Adders import circuit._ val ain = new Wire("ain", true) val bin = new Wire("bin", false) val cin = new Wire("cin", true) val sout = new Wire("sout") val cout = new Wire("cout") probe(ain) probe(bin) probe(cin) probe(sout) probe(cout) fullAdder(ain, bin, cin, sout, cout) circuit.start() } }
Advancing to time 1 Advancing to time 2 signal Wire(cout) changed to false signal Wire(cin) changed to true signal Wire(ain) changed to true signal Wire(sout) changed to false signal Wire(bin) changed to false Advancing to time 3 Advancing to time 4 Advancing to time 5 Advancing to time 6 Advancing to time 7 Advancing to time 8 Advancing to time 9 Advancing to time 10 signal Wire(cout) changed to true Advancing to time 11 Advancing to time 12 Advancing to time 13 Advancing to time 14 Advancing to time 15 Advancing to time 16 Advancing to time 17 Advancing to time 18 signal Wire(sout) changed to true Advancing to time 19 Advancing to time 20 Advancing to time 21 signal Wire(sout) changed to false ** Agenda empty. Clock exiting at time 21.

30.7 Conclusion

For more information about Programming in Scala (the "Stairway Book"), please visit:

http://www.artima.com/shop/programming_in_scala

and:

http://booksites.artima.com/programming_in_scala

Copyright © 2007-2008 Artima, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.