Code Examples for

Actors in Scala

Return to chapter index

9 Distributed and Parallel Computing

  • 9.1 MapReduce
  • 9.2 Reliable broadcast
  • 9.1 MapReduce


    def invertedIndex(input: List[(File, List[String])]) = { val master = self val workers = for ((file, words) <- input) yield actor { val wordsAndFiles = for (word <- words) yield (word, file) master ! Intermediate(wordsAndFiles) } var intermediates = List[(String, File)]() for (_ <- 1 to input.length) receive { case Intermediate(list) => intermediates :::= list } var dict = Map[String, List[File]]() withDefault (k => List()) for ((word, file) <- intermediates) dict += (word -> (file :: dict(word))) var result = Map[String, List[File]]() for ((word, files) <- dict) result += (word -> files.distinct) result }
    case class Intermediate(list: List[(String, File)])
    def mapIndex(file: File, words: List[String]) = for (word <- words) yield (word, file)
    val workers = for ((key, value) <- input) yield actor { master ! Intermediate(mapping(key, value)) }
    var intermediates = List[(K2, V2)]() for (_ <- 1 to input.length) receive { case Intermediate(list) => intermediates :::= list }
    var dict = Map[K2, List[V2]]() withDefault (k => List()) for ((key, value) <- intermediates) dict += (key -> (value :: dict(key)))
    def reduceIndex(key: String, files: List[File]) = files.distinct
    var result = Map[K2, List[V2]]() for ((key, value) <- dict) result += (key -> reducing(key, value))
    def mapReduceBasic[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2] ): Map[K2, List[V2]] = { case class Intermediate(list: List[(K2, V2)]) val master = self val workers = for ((key, value) <- input) yield actor { master ! Intermediate(mapping(key, value)) } var intermediates = List[(K2, V2)]() for (_ <- 1 to input.length) receive { case Intermediate(list) => intermediates :::= list } var dict = Map[K2, List[V2]]() withDefault (k => List()) for ((key, value) <- intermediates) dict += (key -> (value :: dict(key))) var result = Map[K2, List[V2]]() for ((key, value) <- dict) result += (key -> reducing(key, value)) result }
    case class Intermediate[K2, V2](list: List[(K2, V2)])
    ...: error: type mismatch; found : List[(Any, Any)] required: List[(K2, V2)] case Intermediate(list) => intermediates :::= list ^ one error found
    def mapReduce[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2] ): Map[K2, List[V2]] = { case class Intermediate(list: List[(K2, V2)]) case class Reduced(key: K2, values: List[V2]) // ... val reducers = for ((key, values) <- dict) yield actor { master ! Reduced(key, reducing(key, values)) } var result = Map[K2, List[V2]]() for (_ <- 1 to dict.size) receive { case Reduced(key, values) => result += (key -> values) } result }
    def mapreduce[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2] ): Map[K2, List[V2]] = { case class Intermediate(list: List[(K2, V2)]) case class Reduced(key: K2, values: List[V2]) val master = self self.trapExit = true var assignedMappers = Map[Actor, (K, V)]() def spawnMapper(key: K, value: V) = { val mapper = link { master ! Intermediate(mapping(key, value)) } assignedMappers += (mapper -> (key, value)) mapper } for ((key, value) <- input) spawnMapper(key, value) var intermediates = List[(K2, V2)]() var nleft = input.length while (nleft > 0) receive { case Intermediate(list) => intermediates :::= list case Exit(from, 'normal) => nleft -= 1 case Exit(from, reason) => // retrieve assigned work val (key, value) = assignedMappers(from) // spawn new worker to re-execute the work spawnMapper(key, value) } // ...
    def coarseMapReduce[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2], numMappers: Int, numReducers: Int): Map[K2, List[V2]] = { case class Intermediate(list: List[(K2, V2)]) case class Reduced(key: K2, values: List[V2]) val master = self for (group <- input.grouped(input.length / numMappers)) actor { for ((key, value) <- group) master ! Intermediate(mapping(key, value)) } var intermediates = ... var dict = Map[K2, List[V2]]() withDefault (k => List()) for ((key, value) <- intermediates) dict += (key -> (value :: dict(key))) for (group <- dict.grouped(dict.size / numReducers)) actor { for ((key, values) <- group) master ! Reduced(key, reducing(key, values)) } var result = Map[K2, List[V2]]() for (_ <- 1 to dict.size) receive { case Reduced(key, values) => result += (key -> values) } result }

    9.2 Reliable broadcast


    abstract class BroadcastActor extends Actor { // can be set by external actor, therefore @volatile @volatile var isBroken = false private var canRun = true private var counter = 0L protected def broadcast(m: BSend) = if (!isBroken) { for (a <- m.recipients) a ! BDeliver(m.data) } else if (canRun) { canRun = false // simulate it being broken for (a <- m.recipients.take(2)) a ! BDeliver(m.data) println("error at " + this) } // to be overridden in subtraits protected def reaction: PartialFunction[Any, Unit] = { case BCast(msg, recipients) => counter += 1 broadcast(BSend(msg, recipients, counter)) case 'stop => exit() } def act = loopWhile (canRun) { react(reaction) } }
    case class BCast(data: Any, recipients: Set[Actor])
    case class BSend(data: Any, recipients: Set[Actor], timestamp: Long)
    case class BDeliver(data: Any)
    class MyActor extends BroadcastActor { override def reaction = super.reaction orElse { case BDeliver(data) => println("Received broadcast message: " + data + " at " + this) } }
    val a1 = new MyActor; a1.start() val a2 = new MyActor; a2.start() val a3 = new MyActor; a3.start() val a4 = new MyActor; a4.start() a1 ! Broadcast("Hello!", Set(a1, a2, a3, a4))
    Received broadcast message: Hello! at MyActor@3c3c9217 Received broadcast message: Hello! at MyActor@15af33d6 Received broadcast message: Hello! at MyActor@54520eb Received broadcast message: Hello! at MyActor@2c9b42e6
    a1.isBroken = true a1 ! Broadcast("Hello again!", Set(a1, a2, a3, a4))
    error at MyActor@15af33d6 Received broadcast message: Hello again! at MyActor@2c9b42e6
    class RbActor extends BroadcastActor { var delivered = Set[BSend]() override def reaction = super.reaction orElse { case m @ BSend(data, _, _) => if (!delivered.contains(m)) { delivered += m broadcast(m) this ! BDeliver(data) } } }
    protected def broadcast(m: BSend) = if (!isBroken) { for (a <- m.recipients) a ! m } else if (canRun) { canRun = false // simulate it being broken for (a <- m.recipients.take(2)) a ! m println("error at " + this) }
    class MyActor extends RbActor { override def reaction = super.reaction orElse { case BDeliver(data) => println("Received broadcast message: " + data + " at " + this) } }
    error at MyActor@28e70e30 Received broadcast message: Hello again! at MyActor@5954864a Received broadcast message: Hello again! at MyActor@3c3c9217 Received broadcast message: Hello again! at MyActor@1ff82982

    For more information about Actors in Scala, please visit:

    http://www.artima.com/shop/actors_in_scala

    and:

    http://booksites.artima.com/actors_in_scala

    Copyright © 2011 Artima, Inc. All rights reserved.

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.