5.5 Futures 和 Actors
在《启动和运行》章中,我们用 Spray 建立我们的第一个 REST 服务,它用 Actors 来处理 HTTP 请求。 本章已经展示了 ask 方法返回 Future。 下面给出例子:
Listing 5.24 收集活动信息
import akka.pattern.ask
import context._
implicit val timeout = Timeout(5 seconds)
val capturedSender = sender
def askEvent(ticketSeller:ActorRef): Future[Event] = {
  val futureInt = ticketSeller.ask(GetEvents).mapTo[Int]
  futureInt.map { nrOfTickets =>
    Event(ticketSeller.actorRef.path.name, nrOfTickets)
  }
}
val futures = context.children.map {
  ticketSeller => askEvent(ticketSeller)
}
Future.sequence(futures).map {
  events => capturedSender ! Events(events.toList)
}
- 引入 ask 模式,其为 ActorRef 增加了 ask 方法。
- context 包含 actor 分派器的隐式定义。引入 context 就引入了 actor 的分发器,在隐式范围内作为执行上下文,被 futures 使用。
- 需要为 ask 定义超时。 如果 ask 没有在指定时间内完成, future 就会包含一个超时异常。
- 获取上下文中的发送者引用,存入值中。
- 局部方法定义,用来向 TicketSeller 请求 GetEvents。
- ask 方法返回 Future 结果。 因为 Actors 可以发送回任何返回的 Future 没有输入的消息,我们可以用 mapTo 方法来吧 Future[Any] 转化为 Future[Int]. 如果 actor 以不同于 Int 的其他消息响应, mapTo 会以一个失败的 Future 完成。
- 给捕获的发送者发回消息。捕获的发送者是初始的 GetEvents 请求的发送者,其应当接受应答。
- 询问所有的孩子(TODO:what?)活动还剩多少票。
这个例子应当比第二章中的更清楚。 需要强调的是,这个例子显示了名为 boxOffice 的 actor 可以收集每个售票员剩余的票数。
上例展示了一些重要的细节。 首先我们用一个值捕获了发送者引用。 这是必要的,因为发送者是 actor 上下文的一部分,对于 actor 接收的的每个消息都可能不同。 显然,对于每个 actor 收到的消息,其发送者都可以不同。 因为 future 的回调是个闭包,它闭合了它需要使用的值。 在回调触发的时候,发送者可以有一个完全不同的值。 这就是为什么发送者保存在 capturedSender 值中,这使得应答 GetEvents 请求的发起者成为可能。
当在 Actors 中使用 futures 时要认识到 ActorContext 提供了 Actor 的当前视图。 因为 actors 是有状态的,保证你关闭的值在另一个线程中不可变是很重要的。 阻止这个问题的最简单的办法是使用不可变数据结构,从 future 关闭它前捕获不可变数据结构的引用,就像在上边的 capturedSender 例子中那样。
《启动和运行》中例子使用的另一个模式是 pipeTo。 下边的列表展示了其使用的例子:
Listing 5.25
import akka.pattern.pipe
path("events") {
  get { requestContext =>
    val responder = createResponder(requestContext)
    boxOffice.ask(GetEvents).pipeTo(responder)
  }
}
- 引入 pipe 模式
- 为每个 HTTP 请求创建一个 actor
- ask 的 Future 结果通过管道送给应答 actor
RestInterface 对每个请求创建一个 actor 来处理 HTTP 请求并提供 HTTP 应答。 /events URL 被转化为对 boxOffice 的请求,收集每个活动的剩余票量。 对 boxOffice 请求的结果是 Future[Events]。 当其可用时, Future 最终会包含的 Events 值会被 piped 给应答 actor。 当其收到这个消息时,应答 actor 完成请求,如下边的列表所示:
Listing 5.26
class Responder(requestContext:RequestContext, ticketMaster:ActorRef)
  extends Actor with ActorLogging {
    def receive = {
      case Events(events) =>
      requestContext.complete(StatusCodes.OK, events)
      self ! PoisonPill
      // other messages omitted..
    }
  }
当你想用 actor 来处理 Future 的结果,或者当请求一个 actor 的结果需要用另一个 actor 来实现时, 可以考虑使用 pipeTo 方法。