自定义策略
每个应用都会制定不同的策略来容错, 就像前面我们看到的那四个supervisor可以采取不同的策略来处理actor失败的情况。 这时我们就要用的构件块(building blocks)。 这一节我们会回到日志处理程序,为不同的部分建立不同的策略:
1.  忽略错误,恢复孩子,用相同的actor继续处理。
2.  重启孩子,移除失败的actor,并用新的actor实例替换。
3.  停止孩子,永远终止。
4.  上报故障,由父actor来决定如何处理。
首先看看日志处理程序可能发生的异常。 为了简化,自定义几个异常:
Listing 4.12 Exceptions in the log processing application
    @SerialVersionUID(1L)
    class DiskError(msg: String)
        extends Error(msg) with Serializable // 1
    @SerialVersionUID(1L)
    class CorruptedFileException(msg: String, val file: File)
        extends Exception(msg) with Serializable // 2
    @SerialVersionUID(1L)
    class DbBrokenConnectionException(msg: String)
        extends Exception(msg) with Serializable // 3
1.  磁盘发生不可恢复的故障。
2.  日志文件损坏,无法处理。
3.  数据库连接断开异常。
在日志处理程序中actor之间发送消息的协议:
Listing 4.13 Log processing protocol
    object LogProcessingProtocol {
        // represents a new log file
        case class LogFile(file: File) // 1
        // A line in the log file parsed by the LogProcessor Actor
        case class Line(time: Long, message: String, messageType: String) // 2
    }
1.  从FileWatcher收到的日志文件,log Processor会处理它。
2.  日志文件中处理过的一行日志,database writer会把它写到数据库。
首先让我们看看最底层的database writer收到DbBrokenConnectionException异常, 当这个异常发生时,我们应该重启dbWriter。
Listing 4.14 DbWriter crash
    class DbWriter(connection: DbCon) extends Actor {
        import LogProcessingProtocol._
        def receive = {
        case Line(time, message, messageType) =>
        connection.write(Map('time -> time,
        'message -> message,
        'messageType -> messageType)) // 1
        }
    }
1.  在这个连接上写数据时,可能导致actor崩溃。
DbWriter由DbSupervisor监管。supervisor会把消息转发给DbWriter。
Listing 4.15 DbWriter supervisor
    class DbSupervisor(writerProps: Props) extends Actor {
        override def supervisorStrategy = OneForOneStrategy() {
            case _: DbBrokenConnectionException => Restart // 1
        }
        val writer = context.actorOf(writerProps) // 2
        def receive = {
        case m => writer forward (m) // 3
        }
    }
1.  收到DbBrokenConnectionException后重启。
2.  supervisor通过Props创建dbWriter。
3.  supervisor收到的消息,都转发给dbWriter。
如果数据库连接断开,会用新的连接重建database writer。 DbBrokenConnectionException发生时,正在处理的行丢失。 后面我们会看到如何解决这个问题。 上一个层级是logProcessor。 让我们看看如何来监管这个actor,下面的代码显示了logProcessor:
Listing 4.16 LogProcessor crash
    class LogProcessor(dbSupervisor: ActorRef)
        extends Actor with LogParsing {
        import LogProcessingProtocol._
        def receive = {
            case LogFile(file) =>
                val lines = parse(file) // 1
                lines.foreach(dbSupervisor ! _) // 2
        }
    }
1.  解析日志,可能会崩溃。
2.  发送解析后的一行日志给dbSupervisor,dbSupervisor会转发给dbWriter。
当日志文件损坏时,logPprcessor就会崩溃。 这种情况下,我们不想在继续处理此文件,所以我们忽略它。 logProcessor的supervisor恢复失败的actor:
Listing 4.17 LogProcessor supervisor
    class LogProcSupervisor(dbSupervisorProps: Props)
        extends Actor {
        override def supervisorStrategy = OneForOneStrategy() {
            case _: CorruptedFileException => Resume // 1
        }
        val dbSupervisor = context.actorOf(dbSupervisorProps) // 3
        val logProcProps = Props(new LogProcessor(dbSupervisor))
        val logProcessor = context.actorOf(logProcProps) // 2
        def receive = {
            case m => logProcessor forward (m) // 4
        }
    }
1.  发生CorruptedFileException异常时,恢复。忽略出错的文件。
2.  创建logProcessor。
3.  创建database supervisor,并且监管。
4.  转发所有消息到logProcessor。
LogProcSupervisor不用关系database supervisor依赖的任何东西。 它只是简单的使用Props对象。 如果LogProcSupervisor决定重启,它只需要通过Props进行重建actor就可以了。
再上一层,FileWatcher:
Listing 4.18 FileWatcher crash
    class FileWatcher(sourceUri: String,
                logProcSupervisor: ActorRef)
        extends Actor with FileWatchingAbilities {
        register(sourceUri) // 1
        import FileWatcherProtocol._
        import LogProcessingProtocol._
        def receive = {
          case NewFile(file, _) => // 2
            logProcSupervisor ! LogFile(file) // 3
          case SourceAbandoned(uri) if uri == sourceUri =>
            self ! PoisonPill // 4
        }
    }
1.  在watchingAPI中注册观察源的uri。
2.  当有新文件产生时,watchingAPI发送消息过来。
3.  转发给logProcSupervisor。
4.  当源被放弃,不再需要从此获取更多的日志文件,杀死自己。
我们无法知道watchingAPI的细节,它由FileWatchingAbilities trait提供。 FileWatcher不会有上面危险的行为,所以会一直运行直到 watching API告诉他放弃。 FileWatchingSupervisor监管FileWatcher的终止,它会处理所有在下层supervisor发生的DistError。 由于下层supervisor没有定义对DiskError的处理,所以它会自动发往上层supervisor。 由于这是一个无法恢复的错误,所以FileWatchingSupervisor决定停止所有actor。 AllForOneStrategy会被使用。所以一旦发生DiskError,所有的watcher都会被停止。
Listing 4.19 FileWatcher supervisor
    class FileWatchingSupervisor(sources: Vector[String],
                                 logProcSuperProps: Props)
      extends Actor {
        var fileWatchers: Vector[ActorRef] = sources.map { source =>
            val logProcSupervisor = context.actorOf(logProcSuperProps)
            val fileWatcher = context.actorOf(Props(
                new FileWatcher(source, logProcSupervisor)))
            context.watch(fileWatcher) // 1
            fileWatcher
        }
        override def supervisorStrategy = AllForOneStrategy() {
            case _: DiskError => Stop // 2
        }
        def receive = {
            case Terminated(fileWatcher) => // 3
                fileWatchers = fileWatchers.filterNot(w => w == fileWatcher)
                if (fileWatchers.isEmpty) self ! PoisonPill // 4
        }
    }
1.  监控fileWatcher的状态。
2.  发生DiskError时,停止所有fileWatcher。
3.  接受到停止观察者的消息。
4.  当所有的fileWatcher都停止了,杀死自己。
默认情况下OneForOneStrategy和AllForOneStrategy会不断重试。 这两种策略有默认的构造函数参数:maxNrOfRetries和withinTimeRange。 某些情况下你可能想在一定次数或者时间以后停止策略。 一旦配置了这些限制,而在知道的次数或者时间内没有解决问题,故障会被升级。 下面的代码给出了一个database supervisor实例:
Listing 4.20 Impatient database supervisor
    class DbImpatientSupervisor(writerProps: Props) extends Actor {
        override def supervisorStrategy = OneForOneStrategy(
            maxNrOfRetries = 5,
            withinTimeRange = 60 seconds) { // 1
            case _: DbBrokenConnectionException => Restart
        }
        val writer = context.actorOf(writerProps)
        def receive = {
            case m => writer forward (m)
        }
    }
1.  如果问题在60秒或者重试5次以后,没有解决,就把问题升级。
这种机制可以用来防止一个actor连续重启而没有任何效果。