Akkaのエラーハンドリングストラテジを確認したい

はじめに

Akkaを実運用に突っ込もうと考えたときに一番最初に気になるのはActorがくたばった時の挙動ですよね。 知らない間にActorがくたばっていてメッセージだけが虚空の彼方に消えていったなんて事態になった日には次回の自身のボーナスも虚空に消えかねません。

ボーナスを死守するためにもエラーハンドリングは必須の知識として学ぶべきです。

サンプルコード

今回はこんな感じのサンプルコードを使用します。

package org.jyuch

import akka.actor.SupervisorStrategy.{Escalate, Restart, Resume, Stop}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}

import scala.concurrent.duration._

import scala.language.postfixOps

object Hello {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem()
    val calc = system.actorOf(CalcActor.props())

    calc ! Add(1, 2)
    calc ! Add(-1, 2)
    calc ! Add(1, 2)
    calc ! Subtract(3, 2)

    io.StdIn.readLine()
    system.terminate()
  }
}

trait OperandActor extends Actor {
  override def postRestart(reason: Throwable): Unit = {
    super.postRestart(reason)
    //println(reason.getClass.getTypeName)
  }
}

class CalcActor extends OperandActor {
  val add = context.actorOf(AddActor.props(), "add")
  val subtract = context.actorOf(SubtractActor.props(), "subtract")

  def receive = {
    case Result(v) =>{
      println(this + " : " + v)
    }
    case a: Add => add ! a
    case s: Subtract => subtract ! s
  }

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: Exception => Stop
    }
}

object CalcActor {
  def props(): Props = Props[CalcActor]
}

class AddActor extends OperandActor {
  def receive = {
    case Add(x, y) => {
      println(this)
      if (x < 0 || y < 0) throw new Exception()
      sender() ! Result(x + y)
    }
  }
}

object AddActor {
  def props() = Props[AddActor]
}

class SubtractActor extends OperandActor {
  def receive = {
    case Subtract(x, y) => {
      println(this)
      if (x - y < 0) throw new Exception()
      sender() ! Result(x - y)
    }
  }
}

object SubtractActor {
  def props() = Props[SubtractActor]
}

sealed abstract class Operand

case class Add(x: Int, y: Int) extends Operand

case class Subtract(x: Int, y: Int) extends Operand

case class Result(value: Int)

子Actorの後始末

エラーハンドリングストラテジは親ActorのsupervisorStrategyで制御されますが、何も指定しない(デフォルト)とくたばったActorだけを再起動します。

supervisorStrategyで指定できるストラテジは以下の感じになります。

ストラテジ 意味
OneForOneStrategy 子Actorがくたばったらそいつだけ何とかする
AllForOneStrategy 子Actorがくたばったらすべての子Actorを何とかする

で、何とかする内容が以下の感じです。

ストラテジ 意味
Escalate 上位のActorに判断を委譲する
Restart 子Actorを再起動する
Resume エラーを気にせず続行する
Stop 子Actorを停止する

そして、ストラテジをこんな感じに指定します。

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException      ⇒ Resume
    case _: NullPointerException     ⇒ Restart
    case _: IllegalArgumentException ⇒ Stop
    case _: Exception                ⇒ Escalate
  }

ガーディアンアクター

上記の指定で子がくたばった時の挙動を制御できるようになりました。じゃあ自身がくたばったらどうなるのさ?

自身がほかのActorの子だったらそいつが決めますが、自身がsystem.actorOfが生成されていた場合は自身の命運はガーディアンアクターが決めます。

doc.akka.io

Akka 2.1以降では設定ファイルのakka.actor.guardian-supervisor-strategyでガーディアンアクターのストラテジを設定出来ます。

doc.akka.io

  actor {

    # Either one of "local", "remote" or "cluster" or the
    # FQCN of the ActorRefProvider to be used; the below is the built-in default,
    # note that "remote" and "cluster" requires the akka-remote and akka-cluster
    # artifacts to be on the classpath.
    provider = "local"

    # The guardian "/user" will use this class to obtain its supervisorStrategy.
    # It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator.
    # In addition to the default there is akka.actor.StoppingSupervisorStrategy.
    guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"

現状ではakka.actor.StoppingSupervisorStrategyakka.actor.DefaultSupervisorStrategyが指定できるようです。

おわりに

これならActorがくたばったときにいい感じにActorを再起動できそうです。

これでボーナスを心配することなく安心して今夜も眠ることが出来そうです。

おわり

github.com