自定义策略
每个应用都会制定不同的策略来容错, 就像前面我们看到的那四个supervisor可以采取不同的策略来处理actor失败的情况。 这时我们就要用的构件块(building blocks)。 这一节我们会回到日志处理程序,为不同的部分建立不同的策略:
1. 忽略错误,恢复孩子,用相同的actor继续处理。
2. 重启孩子,移除失败的actor,并用新的actor实例替换。
3. 停止孩子,永远终止。
4. 上报故障,由父actor来决定如何处理。
首先看看日志处理程序可能发生的异常。 为了简化,自定义几个异常:
Listing 4.12 Exceptions in the log processing application
@SerialVersionUID(1L)
class DiskError(msg: String)
extends Error(msg) with Serializable // 1
@SerialVersionUID(1L)
class CorruptedFileException(msg: String, val file: File)
extends Exception(msg) with Serializable // 2
@SerialVersionUID(1L)
class DbBrokenConnectionException(msg: String)
extends Exception(msg) with Serializable // 3
1. 磁盘发生不可恢复的故障。
2. 日志文件损坏,无法处理。
3. 数据库连接断开异常。
在日志处理程序中actor之间发送消息的协议:
Listing 4.13 Log processing protocol
object LogProcessingProtocol {
// represents a new log file
case class LogFile(file: File) // 1
// A line in the log file parsed by the LogProcessor Actor
case class Line(time: Long, message: String, messageType: String) // 2
}
1. 从FileWatcher收到的日志文件,log Processor会处理它。
2. 日志文件中处理过的一行日志,database writer会把它写到数据库。
首先让我们看看最底层的database writer收到DbBrokenConnectionException异常, 当这个异常发生时,我们应该重启dbWriter。
Listing 4.14 DbWriter crash
class DbWriter(connection: DbCon) extends Actor {
import LogProcessingProtocol._
def receive = {
case Line(time, message, messageType) =>
connection.write(Map('time -> time,
'message -> message,
'messageType -> messageType)) // 1
}
}
1. 在这个连接上写数据时,可能导致actor崩溃。
DbWriter由DbSupervisor监管。supervisor会把消息转发给DbWriter。
Listing 4.15 DbWriter supervisor
class DbSupervisor(writerProps: Props) extends Actor {
override def supervisorStrategy = OneForOneStrategy() {
case _: DbBrokenConnectionException => Restart // 1
}
val writer = context.actorOf(writerProps) // 2
def receive = {
case m => writer forward (m) // 3
}
}
1. 收到DbBrokenConnectionException后重启。
2. supervisor通过Props创建dbWriter。
3. supervisor收到的消息,都转发给dbWriter。
如果数据库连接断开,会用新的连接重建database writer。 DbBrokenConnectionException发生时,正在处理的行丢失。 后面我们会看到如何解决这个问题。 上一个层级是logProcessor。 让我们看看如何来监管这个actor,下面的代码显示了logProcessor:
Listing 4.16 LogProcessor crash
class LogProcessor(dbSupervisor: ActorRef)
extends Actor with LogParsing {
import LogProcessingProtocol._
def receive = {
case LogFile(file) =>
val lines = parse(file) // 1
lines.foreach(dbSupervisor ! _) // 2
}
}
1. 解析日志,可能会崩溃。
2. 发送解析后的一行日志给dbSupervisor,dbSupervisor会转发给dbWriter。
当日志文件损坏时,logPprcessor就会崩溃。 这种情况下,我们不想在继续处理此文件,所以我们忽略它。 logProcessor的supervisor恢复失败的actor:
Listing 4.17 LogProcessor supervisor
class LogProcSupervisor(dbSupervisorProps: Props)
extends Actor {
override def supervisorStrategy = OneForOneStrategy() {
case _: CorruptedFileException => Resume // 1
}
val dbSupervisor = context.actorOf(dbSupervisorProps) // 3
val logProcProps = Props(new LogProcessor(dbSupervisor))
val logProcessor = context.actorOf(logProcProps) // 2
def receive = {
case m => logProcessor forward (m) // 4
}
}
1. 发生CorruptedFileException异常时,恢复。忽略出错的文件。
2. 创建logProcessor。
3. 创建database supervisor,并且监管。
4. 转发所有消息到logProcessor。
LogProcSupervisor不用关系database supervisor依赖的任何东西。 它只是简单的使用Props对象。 如果LogProcSupervisor决定重启,它只需要通过Props进行重建actor就可以了。
再上一层,FileWatcher:
Listing 4.18 FileWatcher crash
class FileWatcher(sourceUri: String,
logProcSupervisor: ActorRef)
extends Actor with FileWatchingAbilities {
register(sourceUri) // 1
import FileWatcherProtocol._
import LogProcessingProtocol._
def receive = {
case NewFile(file, _) => // 2
logProcSupervisor ! LogFile(file) // 3
case SourceAbandoned(uri) if uri == sourceUri =>
self ! PoisonPill // 4
}
}
1. 在watchingAPI中注册观察源的uri。
2. 当有新文件产生时,watchingAPI发送消息过来。
3. 转发给logProcSupervisor。
4. 当源被放弃,不再需要从此获取更多的日志文件,杀死自己。
我们无法知道watchingAPI的细节,它由FileWatchingAbilities trait提供。 FileWatcher不会有上面危险的行为,所以会一直运行直到 watching API告诉他放弃。 FileWatchingSupervisor监管FileWatcher的终止,它会处理所有在下层supervisor发生的DistError。 由于下层supervisor没有定义对DiskError的处理,所以它会自动发往上层supervisor。 由于这是一个无法恢复的错误,所以FileWatchingSupervisor决定停止所有actor。 AllForOneStrategy会被使用。所以一旦发生DiskError,所有的watcher都会被停止。
Listing 4.19 FileWatcher supervisor
class FileWatchingSupervisor(sources: Vector[String],
logProcSuperProps: Props)
extends Actor {
var fileWatchers: Vector[ActorRef] = sources.map { source =>
val logProcSupervisor = context.actorOf(logProcSuperProps)
val fileWatcher = context.actorOf(Props(
new FileWatcher(source, logProcSupervisor)))
context.watch(fileWatcher) // 1
fileWatcher
}
override def supervisorStrategy = AllForOneStrategy() {
case _: DiskError => Stop // 2
}
def receive = {
case Terminated(fileWatcher) => // 3
fileWatchers = fileWatchers.filterNot(w => w == fileWatcher)
if (fileWatchers.isEmpty) self ! PoisonPill // 4
}
}
1. 监控fileWatcher的状态。
2. 发生DiskError时,停止所有fileWatcher。
3. 接受到停止观察者的消息。
4. 当所有的fileWatcher都停止了,杀死自己。
默认情况下OneForOneStrategy和AllForOneStrategy会不断重试。 这两种策略有默认的构造函数参数:maxNrOfRetries和withinTimeRange。 某些情况下你可能想在一定次数或者时间以后停止策略。 一旦配置了这些限制,而在知道的次数或者时间内没有解决问题,故障会被升级。 下面的代码给出了一个database supervisor实例:
Listing 4.20 Impatient database supervisor
class DbImpatientSupervisor(writerProps: Props) extends Actor {
override def supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 5,
withinTimeRange = 60 seconds) { // 1
case _: DbBrokenConnectionException => Restart
}
val writer = context.actorOf(writerProps)
def receive = {
case m => writer forward (m)
}
}
1. 如果问题在60秒或者重试5次以后,没有解决,就把问题升级。
这种机制可以用来防止一个actor连续重启而没有任何效果。