5.4 组合 Futures
上一节中,我们介绍了 map 和 flatMap 用 futures 把异步函数串联起来。 本节中,我们来看看用 futures 来组合异步函数的其他方法。 Future[T] 特质和 Future 对象提供像 flatMap 和 map 的组合方法(combinator methods)来组合 futures。 组合方法非常类似 Scala Collections API 中的 flatMap, map 以及其他数据结构。 他们使得可以创建转换一个不可变集合到下一个的流水线,一步一步地解决问题。 本节中,我们蜻蜓点水般地看看以函数方式组合 futures 的可能性。 如果你想了解更多关于用 scala 进行函数式编程的内容,我们推荐 Paul Chiusano and Rúnar Bjarnason 编著的《Functional Programming in Scala》
TicketInfo 服务需要组合多个 web service 调用来提供额外信息。 我们使用组合方法来一步一步给 TicketInfo 增加信息,使用以 TicketInfo 为参数,返回 Future[TicketInfo] 的函数。 每一步中产生一个 TicketInfo 的 case 类的备份,传给下一个函数,最终构建一个完整的 TicketInfo 值。 TicketInfo case 类被更新,展示在下边的列表中,包括在服务中使用的其他 case 类。
Listing 5.14 改进的 TicketInfo 类
case class TicketInfo(ticketNr:String,
                      userLocation:Location,
                      event:Option[Event]=None,
                      travelAdvice:Option[TravelAdvice]=None,
                      weather:Option[Weather]=None,
                      suggestions:Seq[Event]=Seq())
case class Event(name:String,location:Location,
                 time:DateTime)
case class Weather(temperature:Int, precipitation:Boolean)
case class RouteByCar(route:String,
                      timeToLeave:DateTime,
                      origin:Location,
                      destination:Location,
                      estimatedDuration:Duration,
                      trafficJamTime:Duration)
case class TravelAdvice(routeByCar:Option[RouteByCar]=None,
                        publicTransportAdvice: Option[PublicTransportAdvice]=None)
case class PublicTransportAdvice(advice:String,
                                 timeToLeave:DateTime,
                                 origin:Location,
                                 destination:Location,
                                 estimatedDuration:Duration)
case class Location(lat:Double, lon:Double)
case class Artist(name:String, calendarUri:String)
- TicketInfo case 类收集旅行建议,天气,和活动建议。
- 在本例中,为保持简单,路线只是个字符串。
- 在本例中,为保持简单,建议只是个字符串。
除了票号和用户位置,其他项目都是可选的。 流中的每一步会增加信息,通过拷贝参数 TicketInfo 和修改新 TicketInfo 中的属性,将其传递给下一个参数。 如我们在 future 错误一节看到的,如果一个服务调用不能完成,相连的信息被置为空。 下图展示了异步 web service 调用流,和我们在本例中即将使用的组合器(combinators)。
Figure 5.9 TicketInfoService 流
上图中组合器以菱形表示。 我们来看看每个组合器的细节。 流从 ticketNr 和票务信息服务的用户位置开始,最终完成一个名为 TicketInfo 的 future 结果。 使用的是应答最快的天气服务。 公交和汽车路线信息在 TravelAdvice 中组合。 同时检索相似的艺术家,请求获取每个艺术家的日程。 这会产生对相似活动的建议。 所有的 futures 最终组合成一个 Future[TicketInfo]。 最后这个最终的 Future[TicketInfo] 会有一个 onComplete 的回调,用一个对用户的应答完成 HTTP 请求,在这些例子中我们会忽略。
我们从组合天气服务开始。 TicketInfo 服务需要并行地调用多个天气服务,然后使用最快的应答。 下图展示了流中使用的组合器。
Figure 5.10 天气流
两个天气服务都返回 Future[Weather],为了下一步的需要,这需要转换为 Future[TicketInfo]。 如果其中一个天气服务没有响应,我们仍可以用另一个服务的应答来通知用户天气。 下边的列表展示了咋爱 TicketInfoService 流中如何用 Future.firstCompleteOf 方法来响应首先完成的服务:
Listing 5.15 使用 firstCompletedOf 来获取最快的应答
def getWeather(ticketInfo:TicketInfo):Future[TicketInfo] = {
  val futureWeatherX = callWeatherXService(ticketInfo)
                        .recover(withNone)
  val futureWeatherY = callWeatherYService(ticketInfo)
                        .recover(withNone)
  val futures = Seq(futureWeatherX, futureWeatherY)
  val fastestResponse = Future.firstCompletedOf(futures)
  fastestResponse.map{ weatherResponse =>
    ticketInfo.copy(weather = weatherResponse)
  }
}
- 错误恢复抽取到 withNone 函数中,这里忽略了。它只是用 None 值进行恢复。
- 首先完成的 Future[Weather]。
- 把天气应答复制到一个新的 TicketInfo 中,将其作为 map 代码块的结果返回。
- map 代码块吧完成的 Weather 值转换为 TicketInfo,生成一个 Future[TicketInfo]。
首先为两个天气服务请求创建两个 futures。 Future.firstCompletedOf 函数根据两个提供的天气服务 future 结果创建一个新的 Future。 重要的是注意到 firstCompleteOf 返回第一个完成的 future。 Future 以成功值或失败完成。 使用上述代码,当遇到类似 WeatherX 服务失败地比可以返回正确结果的 WeatherY 服务快时,ticketInfo 服务就不能添加天气信息。 现在这么做可行,因为我们假设不能应答的服务或表现较差的服务反应回避正常运行的服务慢。 后边会看到, 自己来实现 firstSucceededOf 方法不会太难。 【一旦我们解释了 Promises 并实现了 firstSucceededOf 方法,我们在回到这个话题】 【TODO 这个解释使得我们必须解释 Promises 并深入,如果走的太远,可能涉及到实现自己的 firstSucceededOf】
公共交通和汽车路线服务需要并行处理,如果两个结果都可用,就合并成一个 TravelAdvice。 下图展示了流中使用的增加旅行建议的组合器。
Figure 5.11 旅行建议流
getTraffic and getPublicTransport 在一个 future 中返回两个不同的类型,分别是 RouteByCar 和 PublicTransportAdvice。 这两个值首先放进一个 tuple 值中。 然后这个 tuple 被 map 为一个 TravelAdvice 值。 TravelAdvice 类如下边列表所示。
Listing 5.16 TravelAdvice 类
case class TravelAdvice(routeByCar:Option[RouteByCar] = None,
  publicTransportAdvice: Option[PublicTransportAdvice] = None)
基于这个信息,用户可以决定是开车还是坐公交。 下边的例子展示了如何使用 zip 组合器做到这一点。
Listing 5.17 用 zip 和 map 来组合路线和公交
def getTravelAdvice(info:TicketInfo,
                    event:Event):Future[TicketInfo] = {
  val futureR = callTraffic(info.userLocation,
                            event.location,
                            event.time).recover(withNone)
  val futureP = callPublicTransport(info.userLocation,
                                    event.location,
                                    event.time).recover(withNone)
  futureR.zip(futureP)
         .map {
           case(routeByCar, publicTransportAdvice) =>
            val travelAdvice = TravelAdvice(routeByCar,
                                            publicTransportAdvice)
            info.copy(travelAdvice = Some(travelAdvice))
          }
  }
- 将 Future[RouteByCar] 和 Future[PublicTransportAdvice] zip 进 Future[(RouteByCar, PublicTransportAdvice)]。
- 将路线和公交建议转换为 Future[TicketInfo]。
上述代码首先把公交和汽车路线 future zip 为一个新的 Future, 其中把两个结果放在一个 tuple 中。 然后对组合成的 future 做 map,将结果转换为 Future[TicketInfo],这样就可以进一步串联。 你可以用 for-comprehension 而不是使用 map 方法。 有时这样使得代码可读性更好。 下边的列表展示了如何使用,它与上面列表中的 zip 和 map 所做的完全一样。
Listing 5.18 使用 zip 和 map 来结合汽车和公交建议
for((route, advice) <- futureRoute.zip(futurePublicTransport);
  travelAdvice = TravelAdvice(route, advice)
) yield info.copy(travelAdvice = Some(travelAdvice))
- zip 创建的 future 在某处计算,变成一个 routeByCar 和 publicTransportAdvice 元组。
- for-comprehension 产生 TicketInfo,它作为 Future[TicketInfo] 从 for comprehension 返回,做法与 map 方法类似。
如果你不熟悉 for-comprehensions,可以把它理解成遍历集合(collection)。 在 Future 的例子中,我们遍历一个集合,其包含一个值,或者什么都没有(异常情况时)。
我们要看的流的下一部分是类似活动的建议。 使用了两个 web service;一个类似的返回关于艺术家信息的艺术家服务,与在活动时执行的类似。 艺术家信息用来调用每个艺术家特定的日历服务,来请求活动位置相近的下一个计划的活动,其被用来建议给用户。 下边的列表展示了建议如何建立。
Listing 5.19 对 map 使用 for-comprehension 和 traverse
def getSuggestions(event:Event):Future[Seq[Event]] = {
  val futureArtists = callSimilarArtistsService(event)
  for(artists <- futureArtists
    events <- getPlannedEvents(event, artists)
  ) yield events
}
- 返回 Future[Seq[Events]], 一个包含每个艺术家的计划活动的 future 列表。
- 返回 Future[Seq[Artist]], 与艺术家类似的 Future。
- 'artists' 在某处计算为 Seq[Artist]。
- 'events' 在某处计算为 Seq[Events], 每个被调用的艺术家的计划活动。
- for comprehension 返回 Seq[Event],作为 Future[Seq[Event]]。
上边的例子有点复杂。 为了简洁,代码被分成多个方法,虽然明显可以写在一行里。 一旦有了艺术家,getPlannedEvents 就执行一次。 getPlannedEvents 使用 Future.sequence 方法从 Seq[Future[Event]] 来构建 Future[Seq[Event]]。 换句话说,它将多个 Future 合并为一个包含多个结果的 future。 getPlannedEvents 的代码在如下所示。
Listing 5.20 使用 sequence 把多个 Future[[Event]] 组合成一个 Future[Seq[Event]]
def getPlannedEvents(event:Event, artists:Seq[Artist]) = {
  val events = artists.map { artist=>
    callArtistCalendarService(artist, event.location)
  }
  Future.sequence(events)
}
- 返回 Future[Seq[Event]], 一些计划的活动,每个艺术家一个。
- 对 Seq[Artists] 进行 map。对每个艺术家调用日历服务。 ‘events’ 值是 Seq[Future[Event]]。
- 将 Seq[Future[Event]] 转变为 Future[Seq[Event]]. 当异步的 callArtistCalendarService 调用的所有结果完成时,它最终返回活动列表。
sequence 方法是 traverse 方法的简化版本。 下边的例子展示了当我们使用 traverse 时, getPlannedEvent 的样子。
Listing 5.21 使用 traverse 来将多个 Future[[Event]] 组合成一个 Future[Seq[Event]]
def getPlannedEventsWithTraverse(event:Event, artists:Seq[Artist]) = {
  Future.traverse(artists) { artist=>
    callArtistCalendarService(artist, event.location)
  }
}
traverse 接收一个代码块,其要求返回 Future。 它允许你遍历一个集合,同时创建 future 结果。 使用 sequence,我们首先得创建一个 Seq[Future[Event]],这样我们可以把它转化为 Future[Seq[Event]]。 使用 traverse,我们同样可以做到,只是不需要首先创建 Seq[Futre[Event]] 的中间步骤。
该看看 TicketInfoService 流中最后一步了。 包含天气信息的 TicketInfo 值需要与包含 TravelAdvice 的 TicketInfo 结合。 我们打算使用 fold 方法来将 TicketInfo 结合为一个。
下边的列表展示了如何使用:
Listing 5.22 使用 fold 来把两个 Future[[Event]] 结合为一个 Future[Seq[Event]]
val ticketInfos = Seq(infoWithTravelAdvice, infoWithWeather)
val infoWithTravelAndWeather = Future.fold(ticketInfos)(info) {
  (acc, elem) =>
    val (travelAdvice, weather) = (elem.travelAdvice, elem.weather)
  acc.copy(travelAdvice = travelAdvice.orElse(acc.travelAdvice),
    weather = weather.orElse(acc.weather))
}
- 创建包含旅行建议和天气信息的 TicketInfo 列表。
- 用列表作为参数调用 fold 方法,累加器用只包含活动信息的 ticketInfo 来初始化。
- Fold 返回之前执行的 accumulator('acc') 值中的代码块的结果。它把每个元素传递给代码块,本例中是每个 TicketInfo 值。
- 从 ticketInfo 中提取可选的 travelAdvice 和天气信息。
- 把 travelAdvice 或 天气复制到累加的 TicketInfo 中。拷贝在代码块下一次调用时,作为 'acc' 的下一个值返回。
fold 方法用起来就像 fold 用在你可能熟悉的数据结构 Seq[T] 和 List[T] 一样。 它经常用来代替传统的循环通过在集合上迭代来建立某些数据结构。 fold 的参数是一个集合,一个初始值,和代码块。 代码块针对集合中的每个元素被调用。 代码块需要两个参数,一个用来累加状态,一个是集合中的下一个元素。 在上例中,初始的 TicketInfo 值用来做初始值。 代码块每次迭代,返回 TicketInfo 的拷贝,其中包含更多信息,基于 ticketInfos 列表中的元素。
完整的流展示在下边列表中:
Listing 5.23 完整的 TicketInfoService 流
def getTicketInfo(ticketNr:String,
              location:Location):Future[TicketInfo] = {
  val emptyTicketInfo = TicketInfo(ticketNr, location)
  val eventInfo = getEvent(ticketNr, location)
                  .recover(withPrevious(emptyTicketInfo))
  eventInfo.flatMap { info =>
  val infoWithWeather = getWeather(info)
  val infoWithTravelAdvice = info.event.map { event =>
    getTravelAdvice(info, event)
  }.getOrElse(eventInfo)
  val suggestedEvents = info.event.map { event =>
    getSuggestions(event)
  }.getOrElse(Future.successful(Seq()))
  val ticketInfos = Seq(infoWithTravelAdvice, infoWithWeather)
  val infoWithTravelAndWeather = Future.fold(ticketInfos)(info) {
    (acc, elem) =>
    val (travelAdvice, weather) = (elem.travelAdvice, elem.weather)
    acc.copy(travelAdvice = travelAdvice.orElse(acc.travelAdvice),
              weather = weather.orElse(acc.weather))
    }
  for(info <- infoWithTravelAndWeather;
    suggestions <- suggestedEvents
  ) yield info.copy(suggestions = suggestions)
  }
}
// error recovery functions to minimize copy/paste
type Recovery[T] = PartialFunction[Throwable,T]
// recover with None
def withNone[T]:Recovery[Option[T]] = { case NonFatal(e) => None }
// recover with empty sequence
def withEmptySeq[T]:Recovery[Seq[T]] = { case NonFatal(e) => Seq() }
// recover with the ticketInfo that was built in the previous step
def withPrevious(previous:TicketInfo):Recovery[TicketInfo] = {
  case NonFatal(e) => previous
}
- 首先调用 getEvent, 其返回 Future[TicketInfo]。
- 用天气信息创建一个 TicketInfo。
- 用 TravelAdvice 信息创建 TicketInfo 。
- 获取建议的活动的 future 列表。
- 将天气和旅行组合为一个 TicketInfo。
- 最终增加建议。
- 在 TicketInfoService 流中使用的错误恢复方法。
这就结束了使用 futures 的 TicketInfoService 例子。 如你看到的, futures 可以以多种方式结合,结合器方法使得转换和序列化异步函数结果非常简单。 整个 TicketInfoService 流没有一处阻塞调用。 如果对假定的 web services 的调用使用异步的 HTTP 客户端,如 spray-client 库,实现的,考虑到 I/O,阻塞线程量应该保持在最小数目。 在本书写作时,越来越多用 Scala 写成的针对 I/O 和数据库访问的异步客户库提供了 Future 结果。
下一节中,我们将要看看 futures 如何与 Actors 结合使用。