Code Examples for

Programming in Scala, Fourth Edition

Return to chapter index

32 Futures and Concurrency

Sample run of chapter's interpreter examples

32.1 Trouble in paradise


// In file futures-and-concurrency/Synchronized.scala var counter = 0 synchronized { // One thread in here at a time counter = counter + 1 }

32.2 Asynchronous execution and \c{Try


scala> import scala.concurrent.Future import scala.concurrent.Future scala> val fut = Future { Thread.sleep(10000); 21 + 21 } <console>:11: error: Cannot find an implicit ExecutionContext. You might pass an (implicit ec: ExecutionContext) parameter to your method or import scala.concurrent.ExecutionContext.Implicits.global. val fut = Future { Thread.sleep(10000); 21 + 21 } ^
scala> import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global scala> val fut = Future { Thread.sleep(10000); 21 + 21 } fut: scala.concurrent.Future[Int] = ...
scala> fut.isCompleted res0: Boolean = false scala> fut.value res1: Option[scala.util.Try[Int]] = None
scala> fut.isCompleted res2: Boolean = true scala> fut.value res3: Option[scala.util.Try[Int]] = Some(Success(42))
scala> val fut = Future { Thread.sleep(10000); 21 / 0 } fut: scala.concurrent.Future[Int] = ... scala> fut.value res4: Option[scala.util.Try[Int]] = None
scala> fut.value res5: Option[scala.util.Try[Int]] = Some(Failure(java.lang.ArithmeticException: / by zero))

32.3 Working with \c{Future


scala> val fut = Future { Thread.sleep(10000); 21 + 21 } fut: scala.concurrent.Future[Int] = ...
scala> val result = fut.map(x => x + 1) result: scala.concurrent.Future[Int] = ... scala> result.value res5: Option[scala.util.Try[Int]] = None
scala> result.value res6: Option[scala.util.Try[Int]] = Some(Success(43))
scala> val fut1 = Future { Thread.sleep(10000); 21 + 21 } fut1: scala.concurrent.Future[Int] = ... scala> val fut2 = Future { Thread.sleep(10000); 23 + 23 } fut2: scala.concurrent.Future[Int] = ...
scala> for { x <- fut1 y <- fut2 } yield x + y res7: scala.concurrent.Future[Int] = ...
scala> res7.value res8: Option[scala.util.Try[Int]] = Some(Success(88))
scala> for { x <- Future { Thread.sleep(10000); 21 + 21 } y <- Future { Thread.sleep(10000); 23 + 23 } } yield x + y res9: scala.concurrent.Future[Int] = ... scala> res9.value res27: Option[scala.util.Try[Int]] = None scala> // Will need at least 20 seconds to complete scala> res9.value res28: Option[scala.util.Try[Int]] = Some(Success(88))
scala> Future.successful { 21 + 21 } res2: scala.concurrent.Future[Int] = ...
scala> Future.failed(new Exception("bummer!")) res3: scala.concurrent.Future[Nothing] = ...
scala> import scala.util.{Success,Failure} import scala.util.{Success, Failure} scala> Future.fromTry(Success { 21 + 21 }) res4: scala.concurrent.Future[Int] = ... scala> Future.fromTry(Failure(new Exception("bummer!"))) res5: scala.concurrent.Future[Nothing] = ...
scala> val pro = Promise[Int] pro: scala.concurrent.Promise[Int] = ... scala> val fut = pro.future fut: scala.concurrent.Future[Int] = ... scala> fut.value res8: Option[scala.util.Try[Int]] = None
scala> pro.success(42) res9: pro.type = ... scala> fut.value res10: Option[scala.util.Try[Int]] = Some(Success(42))
scala> val fut = Future { 42 } fut: scala.concurrent.Future[Int] = ... scala> val valid = fut.filter(res => res > 0) valid: scala.concurrent.Future[Int] = ... scala> valid.value res0: Option[scala.util.Try[Int]] = Some(Success(42))
scala> val invalid = fut.filter(res => res < 0) invalid: scala.concurrent.Future[Int] = ... scala> invalid.value res1: Option[scala.util.Try[Int]] = Some(Failure(java.util.NoSuchElementException: Future.filter predicate is not satisfied))
scala> val valid = for (res <- fut if res > 0) yield res valid: scala.concurrent.Future[Int] = ... scala> valid.value res2: Option[scala.util.Try[Int]] = Some(Success(42)) scala> val invalid = for (res <- fut if res < 0) yield res invalid: scala.concurrent.Future[Int] = ... scala> invalid.value res3: Option[scala.util.Try[Int]] = Some(Failure(java.util.NoSuchElementException: Future.filter predicate is not satisfied))
scala> val valid = fut collect { case res if res > 0 => res + 46 } valid: scala.concurrent.Future[Int] = ... scala> valid.value res17: Option[scala.util.Try[Int]] = Some(Success(88))
scala> val invalid = fut collect { case res if res < 0 => res + 46 } invalid: scala.concurrent.Future[Int] = ... scala> invalid.value res18: Option[scala.util.Try[Int]] = Some(Failure(java.util.NoSuchElementException: Future.collect partial function is not defined at: 42))
scala> val failure = Future { 42 / 0 } failure: scala.concurrent.Future[Int] = ... scala> failure.value res23: Option[scala.util.Try[Int]] = Some(Failure(java.lang.ArithmeticException: / by zero)) scala> val expectedFailure = failure.failed expectedFailure: scala.concurrent.Future[Throwable] = ... scala> expectedFailure.value res25: Option[scala.util.Try[Throwable]] = Some(Success(java.lang.ArithmeticException: / by zero))
scala> val success = Future { 42 / 1 } success: scala.concurrent.Future[Int] = ... scala> success.value res21: Option[scala.util.Try[Int]] = Some(Success(42)) scala> val unexpectedSuccess = success.failed unexpectedSuccess: scala.concurrent.Future[Throwable] = ... scala> unexpectedSuccess.value res26: Option[scala.util.Try[Throwable]] = Some(Failure(java.util.NoSuchElementException: Future.failed not completed with a throwable.))
scala> val fallback = failure.fallbackTo(success) fallback: scala.concurrent.Future[Int] = ... scala> fallback.value res27: Option[scala.util.Try[Int]] = Some(Success(42))
scala> val failedFallback = failure.fallbackTo( Future { val res = 42; require(res < 0); res } ) failedFallback: scala.concurrent.Future[Int] = ... scala> failedFallback.value res28: Option[scala.util.Try[Int]] = Some(Failure(java.lang.ArithmeticException: / by zero))
scala> val recovered = failedFallback recover { case ex: ArithmeticException => -1 } recovered: scala.concurrent.Future[Int] = ... scala> recovered.value res32: Option[scala.util.Try[Int]] = Some(Success(-1))
scala> val unrecovered = fallback recover { case ex: ArithmeticException => -1 } unrecovered: scala.concurrent.Future[Int] = ... scala> unrecovered.value res33: Option[scala.util.Try[Int]] = Some(Success(42))
scala> val alsoUnrecovered = failedFallback recover { case ex: IllegalArgumentException => -2 } alsoUnrecovered: scala.concurrent.Future[Int] = ... scala> alsoUnrecovered.value res34: Option[scala.util.Try[Int]] = Some(Failure(java.lang.ArithmeticException: / by zero))
scala> val alsoRecovered = failedFallback recoverWith { case ex: ArithmeticException => Future { 42 + 46 } } alsoRecovered: scala.concurrent.Future[Int] = ... scala> alsoRecovered.value res35: Option[scala.util.Try[Int]] = Some(Success(88))
scala> val first = success.transform( res => res * -1, ex => new Exception("see cause", ex) ) first: scala.concurrent.Future[Int] = ...
scala> first.value res42: Option[scala.util.Try[Int]] = Some(Success(-42))
scala> val second = failure.transform( res => res * -1, ex => new Exception("see cause", ex) ) second: scala.concurrent.Future[Int] = ... scala> second.value res43: Option[scala.util.Try[Int]] = Some(Failure(java.lang.Exception: see cause))
scala> val firstCase = success.transform { // Scala 2.12 case Success(res) => Success(res * -1) case Failure(ex) => Failure(new Exception("see cause", ex)) } first: scala.concurrent.Future[Int] = ... scala> firstCase.value res6: Option[scala.util.Try[Int]] = Some(Success(-42)) scala> val secondCase = failure.transform { case Success(res) => Success(res * -1) case Failure(ex) => Failure(new Exception("see cause", ex)) } secondCase: scala.concurrent.Future[Int] = ... scala> secondCase.value res8: Option[scala.util.Try[Int]] = Some(Failure(java.lang.Exception: see cause))
scala> val nonNegative = failure.transform { // Scala 2.12 case Success(res) => Success(res.abs + 1) case Failure(_) => Success(0) } nonNegative: scala.concurrent.Future[Int] = ... scala> nonNegative.value res11: Option[scala.util.Try[Int]] = Some(Success(0))
scala> val zippedSuccess = success zip recovered zippedSuccess: scala.concurrent.Future[(Int, Int)] = ... scala> zippedSuccess.value res46: Option[scala.util.Try[(Int, Int)]] = Some(Success((42,-1)))
scala> val zippedFailure = success zip failure zippedFailure: scala.concurrent.Future[(Int, Int)] = ... scala> zippedFailure.value res48: Option[scala.util.Try[(Int, Int)]] = Some(Failure(java.lang.ArithmeticException: / by zero))
scala> val fortyTwo = Future { 21 + 21 } fortyTwo: scala.concurrent.Future[Int] = ... scala> val fortySix = Future { 23 + 23 } fortySix: scala.concurrent.Future[Int] = ... scala> val futureNums = List(fortyTwo, fortySix) futureNums: List[scala.concurrent.Future[Int]] = ... scala> val folded = Future.foldLeft(futureNums)(0) { (acc, num) => acc + num } folded: scala.concurrent.Future[Int] = ... scala> folded.value res53: Option[scala.util.Try[Int]] = Some(Success(88))
scala> val reduced = Future.reduceLeft(futureNums) { (acc, num) => acc + num } reduced: scala.concurrent.Future[Int] = ... scala> reduced.value res54: Option[scala.util.Try[Int]] = Some(Success(88))
scala> val futureList = Future.sequence(futureNums) futureList: scala.concurrent.Future[List[Int]] = ... scala> futureList.value res55: Option[scala.util.Try[List[Int]]] = Some(Success(List(42, 46)))
scala> val traversed = Future.traverse(List(1, 2, 3)) { i => Future(i) } traversed: scala.concurrent.Future[List[Int]] = ... scala> traversed.value res58: Option[scala.util.Try[List[Int]]] = Some(Success(List(1, 2, 3)))
scala> failure.foreach(ex => println(ex)) scala> success.foreach(res => println(res)) 42
scala> for (res <- failure) println(res) scala> for (res <- success) println(res) 42
scala> import scala.util.{Success, Failure} import scala.util.{Success, Failure} scala> success onComplete { case Success(res) => println(res) case Failure(ex) => println(ex) } 42 scala> failure onComplete { case Success(res) => println(res) case Failure(ex) => println(ex) } java.lang.ArithmeticException: / by zero
scala> val newFuture = success andThen { case Success(res) => println(res) case Failure(ex) => println(ex) } 42 newFuture: scala.concurrent.Future[Int] = ... scala> newFuture.value res76: Option[scala.util.Try[Int]] = Some(Success(42))
scala> val nestedFuture = Future { Future { 42 } } nestedFuture: Future[Future[Int]] = ... scala> val flattened = nestedFuture.flatten // Scala 2.12 flattened: scala.concurrent.Future[Int] = Future(Success(42))
scala> val futNum = Future { 21 + 21 } futNum: scala.concurrent.Future[Int] = ... scala> val futStr = Future { "ans" + "wer" } futStr: scala.concurrent.Future[String] = ... scala> val zipped = futNum zip futStr zipped: scala.concurrent.Future[(Int, String)] = ... scala> val mapped = zipped map { case (num, str) => s"$num is the $str" } mapped: scala.concurrent.Future[String] = ... scala> mapped.value res2: Option[scala.util.Try[String]] = Some(Success(42 is the answer))
scala> val fut = futNum.zipWith(futStr) { // Scala 2.12 case (num, str) => s"num is the str" } zipWithed: scala.concurrent.Future[String] = ... scala> fut.value res3: Option[scala.util.Try[String]] = Some(Success(42 is the answer))
scala> val flipped = success.transformWith { // Scala 2.12 | case Success(res) => | Future { throw new Exception(res.toString) } | case Failure(ex) => Future { 21 + 21 } | } flipped: scala.concurrent.Future[Int] = ... scala> flipped.value res5: Option[scala.util.Try[Int]] = Some(Failure(java.lang.Exception: 42))

32.4 Testing with \c{Future


scala> import scala.concurrent.Await import scala.concurrent.Await scala> import scala.concurrent.duration._ import scala.concurrent.duration._ scala> val fut = Future { Thread.sleep(10000); 21 + 21 } fut: scala.concurrent.Future[Int] = ... scala> val x = Await.result(fut, 15.seconds) // blocks x: Int = 42
scala> import org.scalatest.matchers.should.Matchers._ import org.scalatest.matchers.should.Matchers._ scala> x should be (42) res0: org.scalatest.Assertion = Succeeded
scala> import org.scalatest.concurrent.ScalaFutures._ import org.scalatest.concurrent.ScalaFutures._ scala> val fut = Future { Thread.sleep(10000); 21 + 21 } fut: scala.concurrent.Future[Int] = ... scala> fut.futureValue should be (42) // futureValue blocks res1: org.scalatest.Assertion = Succeeded
// In file futures-and-concurrency/AddSpec.scala import org.scalatest.funspec.AsyncFunSpec import scala.concurrent.Future class AddSpec extends AsyncFunSpec { def addSoon(addends: Int*): Future[Int] = Future { addends.sum } describe("addSoon") { it("will eventually compute a sum of passed Ints") { val futureSum: Future[Int] = addSoon(1, 2) // You can map assertions onto a Future, then return // the resulting Future[Assertion] to ScalaTest: futureSum map { sum => assert(sum == 3) } } } }

32.5 Conclusion

For more information about Programming in Scala, Fourth Edition (the "Stairway Book"), please visit:

http://www.artima.com/shop/programming_in_scala_4ed

and:

http://booksites.artima.com/programming_in_scala_4ed

Copyright © 2007-2019 Artima, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.