5.2 In the Future nobody blocks

是时间来构建 TicketInfoService,我们不想在任何线程上空等。 按照我们的想法,单元测试是唯一可以阻塞在 future 的例子,因为它可以把验证结果的过程简化。 即使这样,我们还是偏爱把阻塞推迟到测试用例的最后。 我们从 TicketInfo 服务开始,试着执行下图中的两个步骤,这样我们就可以提供到达活动的路线的交通信息。

Figure 5.6 获取关于活动的交通信息

第一步是获取活动的票号。 同步调用函数和异步调用的巨大区别在于你定义的程序的流。 下边列表中展示了获取给定票号的活动的同步 web service。

Listing 5.1 同步调用

    val request = EventRequest(ticketNr)
    val response:EventResponse = callEventService(request)
    val event:Event = response.event
  1. 创建请求
  2. 阻塞主线程,直到回应完成
  3. 读出活动的值

图中展示了在主线程上执行的三行代码。 流非常简单,函数被调用,在同一线程上的返回值立即可以访问。 显然,程序不能在同一线程上继续执行,直到返回值可以访问。 Scala 表达式默认是严格的,上述代码不使用任何惰性计算。 因此上述代码的每一行都需要产生完整的值。

我们来看看我们需要对什么来把这个同步 web service 调用改造为异步的。 在上边的例子中, callEventService 是阻塞地调用 web service,它需要在线程上等待回应。 我们首先把 callEventService 包装在一个在另一个线程执行的代码块中。

下图中展示了代码的变化:

Listing 5.2 异步调用

val request = EventRequest(ticketNr)
val futureEvent:Future[Event] = future {
    val response = callEventService(request)
    response.event
}
  1. 异步调用代码块,返回 Future 活动的结果。
  2. 这在另一个线程上运行。
  3. 应答中的活动可以在另一个线程中访问。

异步流做了微小的调整,它在另一个线程中运行 callEventService。 另一个线程被 callEventService 阻塞,之后会解决这个问题。 future { ... } 是调用 Future 对象的简写,将代码块作为唯一的参数,即 Future.apply(codeblock)。 这个方法在 scala.concurrent 包中,使用时只需要引入 scala.concurrent._。 这是一个异步调用代码块,得到 Future[T] 的助手函数, 在本例中是 Future[Event], 因为我们在下一个调用中需要 Event 来获取交通信息。 如果你对 Scala 不熟悉,代码块中最后的表达式是返回值。 Future.apply 方法确保最后的表达式转变成了 Future[Event]。 futureEvent 值的类型显式定义在本例中,但是由于 Scala 中有类型推导,这个可以忽略。

上述代码块实际是个闭包(Closure)。 闭包是特殊类型的函数,可以在其闭合的范围内使用值。 在 Scala 中,你不需要用特殊动作来创建闭包,但是重要的是注意到这是特殊类型的函数。 代码块引用到了另一个线程中的请求值,这是我们桥接主线程和其他线程的方式,将请求传递给 web service。

很好, web service 现在在另一个线程上调用,我们可以在那里处理应答。 在下边的列表中,我们来看看如何把对 callTrafficService 的调用链接起来,获取活动的交通信息。 第一步,我们在终端上打印出到活动的路线:

Listing 5.3 处理活动结果

futureEvent.foreach { event =>
  val trafficRequest = TrafficRequest(destination = event.location, arrivalTime = event.time)
  val trafficResponse = callTrafficService(trafficRequest)
  println(trafficResponse.route)
}
  1. 异步处理活动结果,当结果可用时。
  2. 以基于活动的请求来调用交通服务。
  3. 在终端上打印出路线。

上述列表对 Future 使用了 foreach 方法,当活动结果可用时,将其作为参数调用代码块。 只有在 callEventService 成功后,才会调用代码块。

在这个例子中,我们希望之后也能使用 Route, 因此最好能返回 Future[Route]。 foreach 方法返回 Unit,所以我们得使用其他东西。 下边的例子展示了如何使用 map 方法做到这一点。

Listing 5.4 串联活动结果

val futureRoute:Future[Route] = futureEvent.map { event =>
  val trafficRequest = TrafficRequest(destination = event.location, arrivalTime = event.time)
  val trafficResponse = callTrafficService(trafficRequest)
  trafficResponse.route
}
  1. 处理活动,返回 Future[Route]。
  2. 把值返回给 map 函数,该函数把值转换为 Future[Route]。

使用 scala.collections 和类似 Option 和 Either 的标准类型,你应该很熟悉 foreach 和 map。 从概念上讲, Future.map 方法与 Option.map 方法很类似。 Option.map 方法调用含有值的代码块,返回一个新的 Option[T] 值, Future.map 调用含有成功结果的代码块,返回一个新的 Future[T] 值。 在本例中是 Future[Route],因为代码块的最后一行返回 Route 值。 再说一次,futureRoute 的类型是显式定义的,可以忽略。 下边的代码展示了你可以直接串联两个 web service 调用。

Listing 5.5 getRoute 方法,返回 Future[Route] 结果

val request = EventRequest(ticketNr)
val futureRoute = future {
  callEventService(request).event
    }.map { event =>
        val trafficRequest = TrafficRequest(destination = event.location,
      arrivalTime = event.time)
  callTrafficService(trafficRequest).route
}
  1. 串联 Future[Event]。
  2. 返回路线。

如果我们重构为接受 ticketNr 参数的 getEvent 方法和接受活动参数的 getRoute 方法,则下边列表中的代码会串联起两个调用。 方法 getEvent 和 getRoute 分别返回 Future[Event] 和 Futurep[Route]。

Listing 5.6 重构版

val futureRoute = getEvent(ticketNr).map { event =>
  getRoute(event)
}

前一个阻塞调用的例子中的 callEventService 和 callTrafficService 方法调用展示了调用从同步转变为异步。 为了真正从异步风格中获益,上述 getEvent 和 getRoute 应该用非阻塞 I/O 实现,并直接返回 futures 以使阻塞线程量最小。 HTTP 中这样 API 的例子是 spray-client 库,它构建在 Akka actors 和 Java NIO 库基础上, Java NIO 库利用了底层操作系统的设施,如选择器和通道(TODO:what?)。 下一节中,我们假定 web service 调用使用 spray-client 库实现的, spray-client 的内容会在 REST 章中介绍。

目前被忽略的细节就是,你需要提供使用 futures 所需要的隐式 ExecutionContext。 如果不提供,代码不能编译。 下边的片段展示了如何为全局执行上下文引入隐式的值。

Listing 5.7 处理活动结果

import scala.concurrent.Implicits.global
  1. 使用全局 ExecutionContext。

ExecutionContext 是对特定线程池实现中任务执行的抽象。 如果你对 java.util.concurrent 包熟悉,它可以比作 java.util.concurrent.Exector 接口以及增加的内容。 如果安全策略允许,全局 ExecutionContext 使用 ForkJoinPool, 否则回退使用 ThreadPoolExecutor。 在 Futures 和 Actors 一节中,我们即将看到,actor 系统的分发器(dispatcher)也可以作为 ExectionContext 使用。

截止到现在,我们只看了串联成功的函数结果。 下一节中,我们给你展示如果从错误结果中恢复。