第二章 启动和运行
这一章的主要内容有:
- 获取一个项目模板
- 为云服务创建一个最小的akka应用程序
- 部署到Heroku
我们的目的是创建一个简单的akka应用.我们首先从github.com上获取我们的实例,然后我们会详细介绍创建akka应用的每一个知识点. 首先我们要知道最小的akka程序需要的依赖,用sbt打包这个程序到jar文件.贯穿本书,我们将建立一个售票系统.本章将建立第一个版本,这个版本我们将建立一个非常小的REST服务, 我们要保证它尽量简单,把焦点放在akka的特性上.最后我们会展示将它部署在Heroku云上是多么的简单.
你很快就会看到一个akka应用的建立和运行是多么的简单,而且运行时的开销是如此的小,我们会忽略一些基础设置的实现细节(例如REST),第8章会详细介绍spray, 在下一章中,我们将结合TDD(Test-Driven Development).
2.1 Clone, Build 和测试接口
为了简单,我们把源码放在github上,我们要做的第一件事情,就是把源码clone到我们的目录. 执行命令:
git clone https://github.com/RayRoestenburg/akka-in-action.git
将会创建akka-in-action目录,里面包含chapter2目录,chapter2中包含了本章的实例.我们假设你已经熟悉git或者github.去附录中查看我们需要安装的工具,这些工具我们后面都会用到,sbt、gi、the heroku toolbelt 和 httpie,附录还包括了流行的IDE,例如Intellij IDEA和Eclips.
让我们来看看工程的结构,SBT的工程跟Maven很像,Maven你可能比较熟悉,最主要的区别是,stb允许用scala来构建工程,因为sbt内部有一个解释器,这就使得它非常强大.关于SBT的更多细节请查看"SBT in Action",第二章的所有代码在src/main/scala中,配置和资源文件在src/main/resources,测试相关的源码和配置在src/test/scala和src/test/resources中,工程很好编译,在chapter2目录下运行下面的命令,插着手等着就行:
sbt assembly
sbt会把所有依赖都打包进target/scala-2.10/goticks-server.jar,你可以运行这个程序,通过下面这个命令:
java -jar target/scala-2.10/goticks-server.jar
如果你非常猴急,已经运行了程序,你会看到像下面这样的输出:
Slf4jEventHandler started
akka://com-goticks-Main/user/io-bridge started
akka://com-goticks-Main/user/http-server started on /0.0.0.0:5000
现在已经验证工程创建正确,下面我将介绍它做了上面。下一节我们首先介绍构建文件,然后是资源文件和源代码.
2.1.1 SBT构建工程
首先让我们看看构建文件。这一章我们使用简单的SBT DSL构建文件,因为这就足够了。代码很紧凑,因为我们采用的不是akka原生的堆栈而是spray。 当然,随着我们的进展,我们将添加更多依赖,或者配置构建支持不同的目标环境(如开发,测试,产品),你将会很快看到这些,而且不需要模板的帮助,也不需要通过剪切复制其他项目。如果你之前没有用过SBT DSL,需要注意的是,构建文件行与行之间需要一个空行(用来表示scala表达式的结束)。构建文件直接位于chapter2目录下,文件名build.sbt。构建文件以两个import开始:
import AssemblyKeys._
import com.typesafe.startscript.StartScriptPlugin
这些imports需要两个SBT插件,第一个插件用于打包jar文件,例如之前的goticks-server.jar.这个jar包含源码和依赖.第二个是Heroku部署需要的,在后面的章节我们会看到.接下来是一些工程的细节,如程序的名称、版本、组织和scala的版本:
name := "goticks"
version := "0.1-SNAPSHOT"
organization := "com.goticks"
scalaVersion := "2.10.0"
下面的代码用了指定从哪个Maven仓库下载依赖:
resolvers ++= Seq("repo" at "http://repo.typesafe.com/typesafe/releases/",
"Spray Repository" at "http://repo.spray.io",
"Spray Nightlies" at "http://nightlies.spray.io/")
resolvers只是一系列的Maven仓库,将由SBT来解析。正如你所看到的,我们添加了typesafe和spray库(构建工具会从那里下载所有的Lib).接下来是依赖的库:
libraryDependencies ++= {
val akkaVersion = "2.3.4"
val sprayVersion = "1.3.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"io.spray" %% "spray-can" % sprayVersion,
"io.spray" %% "spray-routing" % sprayVersion,
"io.spray" %% "spray-json" % "1.2.6",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"ch.qos.logback" % "logback-classic" % "1.1.2",
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % "2.2.0" % "test"
)
}
SBT会根据libraryDependencies的列表从Maven仓库下载依赖的Lib,每个lib的格式是:组织%模块%版,(%%是根据scala版本自动使用正确的版本).这里最重要的依赖是akka-actor,构建文件会为插件推断一些设置,这些插件我们后面章节会介绍,现在我们可以在chapter2目录下执行下面的命令,进行编译、运行测试:
sbt clean compile test
下一节会详细介绍这个例子的细节.
2.1.2 快速浏览一下GoTicks.com 的REST服务
我们的售票系统可以卖各种比赛的票,比如音乐会、体育比赛等。假设我们是一家名叫GoTicks.com的新型公司,现在的头等大事是建立后台的REST服务。 现在我们希望用户可以获取到总的票数。如果所有的票都买完,我们就给用户回应一个404的http错误(Not Found)。 现在的第一件事情就是定义添加新票的接口,因为其他所有服务都要求系统中有票。一个赛事包含赛事的名字和总的票数。
所有的REST接口:
Table 2.1 m REST API
| Tables | Are | Cool |
| ------------- |:-------------:| -----:|
| col 3 is | right-aligned | $1600 |
| col 2 is | centered | $12 |
| zebra stripes | are neat | $1 |
让我们编译程序并在SBT中运行它。到chapter2目录下,执行下面的命令:
sbt run
售票系统会在SBT中自动运行,你会看到像下面这样的输出:
akka://com-goticks-Main/user/http-server started on /0.0.0.0:5000
如果你出错,看看你是否在其他控制台下面已经启动了售票系统,或者其他程序占用了5000端口。 让我们通过httpie(可以读取http请求的命令行工具,更多内容请查看附件)来查看每一项工作,首先让我们看看如何创建一项赛事:
http PUT localhost:5000/events event=RHCP nrOfTickets:=10
上面那条命令创建了一个JSON的请求,请求的内容包含event(赛事名,后面统一用event,不再解释,nrOfTickets也是)和nrOfTickets(票数):{ event: "RHCP", nrOfTickets : 10} 这个请求由我们的售票系统来处理然后返回给httpie响应,httpie把响应打印出来。你会看看到像下面这样的响应消息:
HTTP/1.1 200 OK
Content-Length: 2
Content-Type: text/plain
Date: Tue, 16 Apr 2013 11:22:48 GMT
Server: GoTicks.com REST API
OK
我们再来创建另外一个赛事:
http PUT localhost:5000/events event=DjMadlib nrOfTickets:=15
我们试一下GET请求
http GET localhost:5000/events
会得到像下面这样的响应
HTTP/1.1 200 OK
Content-Length: 92
Content-Type: application/json; charset=UTF-8
Date: Tue, 16 Apr 2013 12:39:10 GMT
Server: GoTicks.com REST API
[
{
"event": "DjMadlib",
"nrOfTickets": 15
},
{
"event": "RHCP",
"nrOfTickets": 10
}
]
我们看到了两个赛事,并且所有票都可买。现在我们卖一张RHCP赛事的票:
http GET localhost:5000/ticket/RHCP
系统分配一张票给我们,下面是我们的票(仍然是JSON格式):
HTTP/1.1 200 OK
Content-Length: 32
Content-Type: application/json; charset=UTF-8
Date: Tue, 16 Apr 2013 12:40:00 GMT
Server: GoTicks.com REST API
{
"event": "RHCP",
"nr": 1
}
如果我们再次发送GET所有赛事的请求,我们可能会得到下面这样的响应:
HTTP/1.1 200 OK
Content-Length: 91
Content-Type: application/json; charset=UTF-8
Date: Tue, 16 Apr 2013 12:41:42 GMT
Server: GoTicks.com REST API
[
{
"event": "DjMadlib",
"nrOfTickets": 15
},
{
"event": "RHCP",
"nrOfTickets": 9
}
]
跟期望的一样,已经剩9张RHCP赛事的票了,如果你重复GET /ticket/RHCP 11次,你会得到404响应:
HTTP/1.1 404 Not Found
Content-Length: 83
Content-Type: text/plain
Date: Tue, 16 Apr 2013 12:42:57 GMT
Server: GoTicks.com REST API
The requested resource could not be found
but may be available again in the future.
上面已经包含了所有的REST API的调用,显然我们的应用支持基本的CRUD周期事件,我们先创建关于一下赛事的票,然后售出,直到售完为止。 我们的策略是不全面的,例如:我们没有考虑什么时候不再买票,谁的票不可用了,如果比赛已经开始了。让我们在下一节看看实现的细节。
2.2应用程序中的Actors
在这一节我们看看这样建立一个应用程序。 你可以自己创建一个Actors程序,或者从github上获取。 就像你现在知道的,Actors包含四种操作:create, send/receive, become和 supervise。在这个例子中,我们只讲前两个。 首先我们看看整体的结构:操作是如何通过不同的合作者提供的核心功能而被执行的,
2.2.1应用程序的结构
我们的应用程序包含三个actor类。我们首先要建立一个actor system,它包含所有的actor,然后actor可以再创建其他的actor,像下图所示:

REST接口的actor会处理HTTP请求。它基本上是个HTTP适配器:它主要负责JSON转换和HTTP响应。即使在这个简单的例子中,我们也能够看到大量的协作者,每个都完成特定的职责。TicketSeller跟踪一个特定的赛事并且卖票。下图表明了如何通过Actor系统创建一个基于事件流的请求(这是上面的第一个服务):
我们上面讨论的第二个服务是客户买票,下图显示了当收到这样的请求(JSON格式)时的流程:
这是这个小应用的最重要流程:我们的业务就是卖票。这也是我们在现实应用中,压力最大的一个流程。
让我们整体看一下代码,首先我们先看主类,因为所有的启动都从它开始。跟普通的Scala程序的主类一样,可以跟运行Scala应用程序一样运行,跟Java一样有个main方法。下面显示了主类(我这是最新的代码,跟E文里面的代码不太一样,后面的注释都是老代码的,如果此处有变动,我后面会加入一些解释):
package com.goticks
import akka.actor._
import akka.io.IO
import spray.can.Http
import com.typesafe.config.ConfigFactory
object Main extends App { // 1
val config = ConfigFactory.load() // 2
val host = config.getString("http.host") // 3
val port = config.getInt("http.port")
implicit val system = ActorSystem("goticks")
val api = system.actorOf(Props(new RestInterface()), "httpInterface") // 4
IO(Http) ! Http.Bind(listener = api, interface = host, port = port) // 5
}
spray-can应用程序(老代码会混入SprayCanHttpServerApp特质,所以说是spray-can应用程序)
- 加载配置文件
- 获取HTTP的配置值
- 创建顶级的REST接口的Actor
- 启动HTTP服务(因为1的修改,这里也有修改,老代码是newHttpServer(api) ! Bind(interface = host, port = port))
主类继承子APP,我们可以引入我们的配置参数并且把它们用在我们的REST接口中,REST接口负责处理请求,你不需要知道Spray是如何实现这些的(后面我们会详细介绍)。 在主类的最后一行启动实际的服务并绑定特定的地址和端口,应用程序内部Actor之间通过下面的消息进行通信:
case class Event(event:String, nrOfTickets:Int)
case object GetEvents
case class Events(events:List[Event])
case object EventCreated
case class TicketRequest(event:String)
case object SoldOut
case class Tickets(tickets:List[Ticket])
case object BuyTicket
case class Ticket(event:String, nr:Int)
- 创建一个赛事
- 获取所有赛事
- 获取所有赛事的响应
- 表示一个赛事已经创建了
- 请求一张票
- 赛事的票已经卖出
- BoxOffice创建一类新票的消息
- 买票消息
票的信息,名字和编号
像典型的REST应用,我们有一个围绕核心实体生命周期的界面:赛事和票。 AKKA将会获取它们跟不变的消息一起发送出去,所以Actor必须能够获取到它们所需要的所有信息,并且能够输出它的合作者需要的信息。 这很适合REST。 下一节我们将看到Actor的更多细节。 我们首先从TicketSeller开始。
2.2.2 买票的Actor:TicketSeller
TicketSeller由BoxOffice创建,用来保存票的列表。每次请求要一张票,都从队列的头取一张票。下面是TicketSeller的代码:
package com.goticks
import akka.actor.{PoisonPill, Actor}
class TicketSeller extends Actor {
import TicketProtocol._
var tickets = Vector[Ticket]() // 1
def receive = {
case GetEvents => sender ! tickets.size // 2
case Tickets(newTickets) => tickets = tickets ++ newTickets // 3
case BuyTicket =>
if (tickets.isEmpty) { // 4
sender ! SoldOut
self ! PoisonPill
}
tickets.headOption.foreach { ticket =>
tickets = tickets.tail
sender ! ticket
}
}
}
- 门票的列表
- 收到GetEvents消息时,返回列表的大小
- 收到Tickets消息时,把收到票加入列表
如果没有票了,返回SoldOut消息,并且杀死自己,如果有票,就从队列头取出一张,返回给请求的Actor
TicketSeller在无票可售时,会给自己发送 PoisonPill(毒丸)消息,把自己毒死,这使得Actor的清理工作很容易。下一节我们看看BoxOffice
2.2.3 BoxOffice(票房,或者理解为售票大厅)
BoxOffice为每一种票创建一个TicketSeller,下面的代码是对Event消息的响应流程:
case Event(name, nrOfTickets) =>
log.info(s"Creating new event ${name} with ${nrOfTickets} tickets.")
if(context.child(name).isEmpty) { // 如果这种类型的票没有,就创建
val ticketSeller = createTicketSeller(name)
val tickets = Tickets((1 to nrOfTickets).map(nr=> Ticket(name, nr)).toList)
ticketSeller ! tickets
}
sender ! EventCreated
trait CreateTicketSellers { self:Actor =>
def createTicketSeller(name:String) = context.actorOf(Props[TicketSeller], name)
}
BoxOffice为每种票创建一个TicketSeller,注意:CreateTicketSellers中用的是context的actorOf方法来创建TicketSeller的,而不是system; 用一个Actor的context创建的Actor,就是创建了一个自己的一个孩子,父Actor可以监控子Actor。 BoxOffice创建一个票的列表,把它交给TicketSeller来销售,而且会对下达创建新票的Actor(RestInterface Actor)发送响应消息EventCreated。 下面的代码是对TicketRequest消息的响应流程:
case TicketRequest(name) =>
log.info(s"Getting a ticket for the ${name} event.")
context.child(name) match {
case Some(ticketSeller) => ticketSeller.forward(BuyTicket)
case None => sender ! SoldOut
}
BoxOffice根据票的名字查找TicketSeller,如果找到了,就给它发送BuyTicket消息,TicketSeller对BuyTicket消息的处理我们上面已经说过了; 如果没有找到,就说明票已经卖完了,响应SoldOut消息。 下一个消息的处理有点复杂,我们询问所有的TicketSeller还有多少票,然后将所得到的结果组成列表返回,有趣的是我们不想等待每一个TicketSeller的回答而使得BoxOffice阻塞,所以我们引入了ask方法,ask是一个异步的操作。
下面的代码我们会用到Future(期货,这种专业术语就不翻译了),Future会在第7章介绍,我们暂时先跳过。让我们看看下面的代码:
case GetEvents =>
import akka.pattern.ask
val capturedSender = sender
def askAndMapToEvent(ticketSeller:ActorRef) = { // 1
val futureInt = ticketSeller.ask(GetEvents).mapTo[Int] // 2
futureInt.map(nrOfTickets => Event(ticketSeller.actorRef.path.name, nrOfTickets)) // 3
}
val futures = context.children.map(ticketSeller => askAndMapToEvent(ticketSeller)) // 4
Future.sequence(futures).map { events =>
capturedSender ! Events(events.toList)
}
- 定义一个本地方法,给TicketSeller发送GetEvents消息,获取现在剩余多少票。
- 用ask发送GetEvents消息,ask是异步的,响应会在将来的一个时间点赋值给futureInt变量(一个Future变量)。
- 把Future变量转换为Evnet消息。
给所有子Actor发送消息,询问还剩多少票。
现在让我们抛开例子看看概念。这里发生了什么?我们调用ask函数立即返回,返回值是Future变量,futurn是一个未来的期待值,所以叫future。 返回的future值我们没有直接访问,而是定义了等future变为可用时的动作。 我们甚至可以定义等所有这些future都完成时,把future的列表转换为值的列表。 等所有ask的响应都收到后,发送GetEvents的响应消息Events(接收者是RestInterface Actor)。 如果还是不明白,不要着急,因为我们会有一整章介绍这些功能。这里我们只是想让你对这个很牛逼的功能感到好奇,如果你实在等不及,想了解一下这些非阻塞的异步操作,那就去看看第7章吧。 我们还剩一个actor没有介绍,就是REST接口,我们下一节介绍。
2.2.4 REST接口
REST接口用的是Spray的路由 DSL,第9章会介绍。服务的接口会用到越来越多的复杂的规则(routing是不是应该翻译成规则?)。 当我们刚开始创建这个售票系统时,只是简单的卖票,规则特别简单。让我们通过下面的列表看看这些简单的规则。 首先REST接口需要处理Event请求:
列表2.6 创建BoxOffice Actor
val BoxOffice = context.actorOf(Props[BoxOffice])
REST接口创建一个BoxOffice Actor作为自己的子Actor。它也创建了一个临时的响应Actor,这个Actor的生命周期就是一个HTTP请求:
**列表2.7 创建临时的响应Actor
def createResponder(requestContext:RequestContext) = {
context.actorOf(Props(new Responder(requestContext, BoxOffice)))
}
响应Actor发送消息给BoxOffice,并且处理TicketSeller和BoxOffice的响应消息。下面是一段处理HTTP请求的DSL代码:
××列表2.8 Spray Route Definition (DSL)
path("ticket") {
get {
entity(as[TicketRequest]) { ticketRequest => requestContext =>
val responder = createResponder(requestContext)
boxOffice.ask(ticketRequest).pipeTo(responder)
}
}
} ~
请求实体被解析为TicketRequest对象。通过调用BoxOffice的ask函数,并把结果pile给响应Actor, 当响应Actor收到TicketSeller的Ticket或者SoldOut消息时,对HTTP请求进行响应:
**列表2.9 响应Actor处理请求完成:响应
class Responder(requestContext:RequestContext, ticketMaster:ActorRef) extends Actor with ActorLogging {
import TicketProtocol._
import spray.httpx.SprayJsonSupport._
def receive = {
case ticket:Ticket =>
requestContext.complete(StatusCodes.OK, ticket)
self ! PoisonPill
case EventCreated =>
requestContext.complete(StatusCodes.OK)
self ! PoisonPill
case SoldOut =>
requestContext.complete(StatusCodes.NotFound)
self ! PoisonPill
case Events(events) =>
requestContext.complete(StatusCodes.OK, events)
self ! PoisonPill
}
}
响应Actor在处理完响应消息后,给自己发送PoisonPill(毒丸)消息,然后就退出了,它的生命周期就是一个HTTP请求。 响应消息会自动转换为JSON格式。 Spray这一章有个实现的所有细节。 这就是第一个版本的售票系统的所有Actor。 你现在已经知道如何创建一个带有全部REST接口的异步akka Actor应用了。 应用是微不足道的,但是我们已经使所有的处理都是并行的,而且是可伸缩(因为它是并行的)和容错(我们后面会看到)的。 这个例子也表明我们可以用异步方式来处理HTTP的同步请求和响应。 相比传统的方式,我们的代码也是相当的少。 在下一节我们将如何把这个小应用部署到云上。
2.3 云
2.4 总结
在这一章,我们介绍了如何用很少的东东创建一个全功能的REST系统。所有的交互都是异步的。用httpie命令工具进行测试,所有服务都正确。 我们部署我们的应用到云, 当然我们的售票系统还无法部署在生产环境。 我们没有持久化,应用被部署在云上,但是web dynos可以随时更换,所以在内存中保存门票,在显示生活中很容易就无法工作了。 应用的没有扩展到多节点。
但是我保证在后面的章节中我们会逐渐接近一个真实的系统。下一章我们将学习如何测试Actor系统。

