//example from "Actors in Scala" chapter 9 // def mapreduce[K, V, K2, V2]( // input: List[(K, V)], // mapping: (K, V) => List[(K2, V2)], // reducing: (K2, List[V2]) => List[V2] // ): Map[K2, List[V2]] = { // case class Intermediate(list: List[(K2, V2)]) // case class Reduced(key: K2, values: List[V2]) // val master = self // self.trapExit = true // var assignedMappers = Map[Actor, (K, V)]() // def spawnMapper(key: K, value: V) = { // val mapper = link { // master ! Intermediate(mapping(key, value)) // } // assignedMappers += (mapper -> (key, value)) // mapper // } // for ((key, value) <- input) // spawnMapper(key, value) // var intermediates = List[(K2, V2)]() // var nleft = input.length // while (nleft > 0) // receive { // case Intermediate(list) => intermediates :::= list // case Exit(from, 'normal) => nleft -= 1 // case Exit(from, reason) => // // retrieve assigned work // val (key, value) = assignedMappers(from) // // spawn new worker to re-execute the work // spawnMapper(key, value) // } // var dict = Map[K2, List[V2]]() withDefault (k => List()) // for ((key, value) <- intermediates) // dict += (key -> (value :: dict(key))) // val reducers = for ((key, values) <- dict) yield // actor { // master ! Reduced(key, reducing(key, values)) // } // var result = Map[K2, List[V2]]() //TODO also consider failures of the reducers // for (_ <- 1 to dict.size) // receive { // case Reduced(key, values) => // result += (key -> values) // } // result // } //To make this things work we need to bound the number of failure. //the clean things is to have a fairness constraints to force some progress but it is not yet implemented //for the moment we adda failure counter that prevents failure to occur infinitely often init (m, master) -> (i, input)* node (f, failure)* transition "master: make mappers" pre (m, master) -> (i, input) post (m, master) -> (w, mapper) (w, mapper) -> (i, input) ==> m -> m i -> i <== transition "master: mappers created" pre node (m, master) post node (m, master1) ==> m -> m <== no (m, master) -> (i, input) ==> m -> m transition "master: mapper died" pre (m, master1) -> (c, crash) (c, crash) -> (i, input) post (m, master1) -> (w, mapper) (w, mapper) -> (i, input) ==> m -> m i -> i <== transition "master: mapping done" pre node (m, master1) post node (m, master2) ==> m -> m <== no (m, master1) -> (w, _) (w, _) -> (i, input) ==> m -> m transition "master: reducer" pre (m, master2) -> (k1, key) [inter] post (m, master2) -> (w, reducer) (w, reducer) -> (k1, key) ==> m -> m k1 -> k1 <== transition "master: reducers created" pre node (m, master2) post node (m, master3) ==> m -> m <== no (m, master2) -> (k, key) [inter] ==> m -> m transition "master: reducer died" pre (m, master3) -> (c, crash) (c, crash) -> (k1, key) post (m, master3) -> (w, reducer) (w, reducer) -> (k1, key) ==> m -> m k1 -> k1 <== transition "master: done" pre node (m, master3) post node (m, master4) ==> m -> m <== no (m, master3) -> (w, _) (w, _) -> (k1, key) ==> m -> m transition "mapper: ok" pre (m, _) -> (w, mapper) (w, mapper) -> (i, input) post (m, _) -> (k, key)* [inter] (k, key)* -> (v, value)** node (f, failure)* ==> <== m -> m transition "mapper: crash" pre (m, _) -> (w, mapper) (w, mapper) -> (i, input) node (f, failure) post (m, _) -> (c, crash) (c, crash) -> (i, input) ==> i -> i <== m -> m transition "reducer: ok" pre (m, _) -> (w, reducer) (w, reducer) -> (k1, key) post (m, _) -> (k2, key) [result] (k2, key) -> (v1, value) node (f, failure)* ==> <== m -> m no (k1, key) -> (v2, value) ==> k1 -> k1 transition "reducer: crash" pre (m, _) -> (w, reducer) (w, reducer) -> (k1, key) node (f, failure) post (m, _) -> (c, crash) (c, crash) -> (k1, key) ==> k1 -> k1 <== m -> m