Akkaでもテストを実施したい
はじめに
あけましておめでとうございます
今年の目標も前年と同様に「生き抜く」を主軸としてなんかいい感じに頑張ります。
今回はAkkaのテストについてです。
今回の内容はほとんどAkka実践バイブル(翔泳社 2017年)から参考にさせて頂いた*1のですが、ActorSystem
のシャットダウン方法が2.4でdeprecated
入りしたActorSystem.shutdown
を使っている所に若干の不安を抱きますがまぁ英語版の書籍が出て日本語訳が出るまでにタイムラグが発生すんだからまぁ仕方ないよねと言った感じです。
最終的に何が言いたいのかというと、素晴らしい本をありがとうございます。
概要
Akka(というよりはアクターモデル)ではActorにメッセージを送信し、メッセージを受け取ったActorは自身の内部状態を変化させたり受信したメッセージをそのままもしくは改変して別のActorに送信します。(データベースやKafkaのようなメッセージシステムに出力する場合もありますが、今回はその辺はやりません。) 逆に言うとActorはメッセージを送らない限り何もしません。
そこで、テスト用のActorSystem
を作成し、そのActorSystem
内でActorにメッセージを送信し返信メッセージもしくは内部状態を捕捉してAssertするのが基本戦略になるっぽいです。
ヘルパトレイト
ここではテストケースが終了したタイミングをフックしてActorSystem
を終了するトレイトをあらかじめ定義しておきます。
import org.scalatest.{Suite, BeforeAndAfterAll} import akka.testkit.TestKit trait StopSystemAfterAll extends BeforeAndAfterAll { this :TestKit with Suite => override protected def afterAll(): Unit = { super.afterAll() system.terminate() } }
内部状態のアサート
ここではSumActor
という、受信した値の合計をひたすら加算するActorのテストを想定します。
import akka.actor.{Actor, ActorRef} import jyuch.SumActor.{GetSum, Operand} class SumActor extends Actor { var sum: Int = 0 override def receive: Receive = { case Operand(value) => { sum += value } case GetSum(sender) => sender ! sum } def currentSum = sum } object SumActor { case class Operand(value: Int) case class GetSum(sender: ActorRef) }
import akka.actor.{ActorSystem, Props} import akka.testkit.{TestActorRef, TestKit} import jyuch.SumActor.{GetSum, Operand} import org.scalatest.{MustMatchers, WordSpecLike} class SumActorTest extends TestKit(ActorSystem("testsystem")) with WordSpecLike with MustMatchers with StopSystemAfterAll { "SumActor" must { "change internal value when receives a message" in { val sumActor = TestActorRef[SumActor] sumActor ! Operand(10) sumActor.underlyingActor.sum must be (10) } "change internal value when receives a message, multi" in { val sumActor = system.actorOf(Props[SumActor]) sumActor ! Operand(10) sumActor ! Operand(20) sumActor ! GetSum(testActor) expectMsg(30) } } }
内部状態の直接参照
TestActorRef
でActorをラップすると不思議な力でActorの内部に触れるようになるので*2、メッセージを送信した後目的の内部状態になっているかをアサートできるようになります。
val sumActor = TestActorRef[SumActor] sumActor ! Operand(10) sumActor.underlyingActor.sum must be (10)
メッセージ経由での内部状態の参照
case GetSum(sender) => sender ! sum
のようにActorが処理するメッセージの中に内部状態をエクスポートするハンドラを用意することでメッセージ経由で内部状態をアサートできるようになります。
val sumActor = system.actorOf(Props[SumActor]) sumActor ! Operand(10) sumActor ! Operand(20) sumActor ! GetSum(testActor) expectMsg(30)
どちらの方法でもActorに本来必要のないロジックを追加することになるのでなんだかなぁという感じになりますが、まぁこの辺は妥協が必要なところってことでしょうか。
メッセージのアサート
ここでは特定の条件でメッセージをフィルタしつつ送信者に送り返すActorのテストを想定します。
import akka.actor.Actor import jyuch.FilterEchoActor.Message class FilterEchoActor(filter: String) extends Actor { override def receive: Receive = { case Message(msg) => { if (msg.startsWith(filter)) { sender() ! Message(msg) } } } } object FilterEchoActor { def props(filter: String) = { akka.actor.Props(new FilterEchoActor(filter)) } case class Message(msg: String) }
import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import jyuch.FilterEchoActor.Message import org.scalatest.{MustMatchers, WordSpecLike} class FilterEchoActorTest extends TestKit(ActorSystem("testsystem")) with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll { "FilterEchoActor" must { "return message when message starts with provided string" in { val echoActor = system.actorOf(FilterEchoActor.props("Hello")) echoActor ! Message("Hello world") echoActor ! Message("Hello jyuch") echoActor ! Message("Hoge world") echoActor ! Message("Hoge jyuch") echoActor ! Message("Hello world") val msg = receiveN(3) msg must be(Vector(Message("Hello world"), Message("Hello jyuch"), Message("Hello world"))) } } }
ImplicitSender
トレイトをmix-inするとメッセージの送信者を暗黙的にtestActor
にしてくれます。
フィルタの条件を指定してActorを生成しメッセージを送信し、送り返されたメッセージが望んだ結果かをアサートしています。
おわりに
余談ってほど余談ではないのですが、Actorにコンストラクタ引数で何かしらのパラメータを渡したいときはコンパニオンオブジェクトにファクトリメソッドを定義する方法が推奨されているようです。
また、この記事を書いているときに思いついたのですが、今年の目標に「しょうもない小ネタを挟むために時間を使わない」も付け加えておきます。
元旦
Scalaでも値が範囲に入っているか確認したい
はじめにとおわりに
Scalaで整数が特定の範囲に入っているかを宣言的に書けないかなと思ったのですが、やっぱり書けました。
val range = 0 to 255 println(range contains -1) println(range contains 0) println(range contains 20) println(range contains 255) println(range contains 256) val range2 = 0 until 255 println(range2 contains 254) println(range2 contains 255)
false true true true false true false
これだけです。
おわり
Akkaのエラーハンドリングストラテジを確認したい
はじめに
Akkaを実運用に突っ込もうと考えたときに一番最初に気になるのはActorがくたばった時の挙動ですよね。 知らない間にActorがくたばっていてメッセージだけが虚空の彼方に消えていったなんて事態になった日には次回の自身のボーナスも虚空に消えかねません。
ボーナスを死守するためにもエラーハンドリングは必須の知識として学ぶべきです。
サンプルコード
今回はこんな感じのサンプルコードを使用します。
package org.jyuch import akka.actor.SupervisorStrategy.{Escalate, Restart, Resume, Stop} import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props} import scala.concurrent.duration._ import scala.language.postfixOps object Hello { def main(args: Array[String]): Unit = { val system = ActorSystem() val calc = system.actorOf(CalcActor.props()) calc ! Add(1, 2) calc ! Add(-1, 2) calc ! Add(1, 2) calc ! Subtract(3, 2) io.StdIn.readLine() system.terminate() } } trait OperandActor extends Actor { override def postRestart(reason: Throwable): Unit = { super.postRestart(reason) //println(reason.getClass.getTypeName) } } class CalcActor extends OperandActor { val add = context.actorOf(AddActor.props(), "add") val subtract = context.actorOf(SubtractActor.props(), "subtract") def receive = { case Result(v) =>{ println(this + " : " + v) } case a: Add => add ! a case s: Subtract => subtract ! s } override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: Exception => Stop } } object CalcActor { def props(): Props = Props[CalcActor] } class AddActor extends OperandActor { def receive = { case Add(x, y) => { println(this) if (x < 0 || y < 0) throw new Exception() sender() ! Result(x + y) } } } object AddActor { def props() = Props[AddActor] } class SubtractActor extends OperandActor { def receive = { case Subtract(x, y) => { println(this) if (x - y < 0) throw new Exception() sender() ! Result(x - y) } } } object SubtractActor { def props() = Props[SubtractActor] } sealed abstract class Operand case class Add(x: Int, y: Int) extends Operand case class Subtract(x: Int, y: Int) extends Operand case class Result(value: Int)
子Actorの後始末
エラーハンドリングストラテジは親ActorのsupervisorStrategy
で制御されますが、何も指定しない(デフォルト)とくたばったActorだけを再起動します。
supervisorStrategy
で指定できるストラテジは以下の感じになります。
ストラテジ | 意味 |
---|---|
OneForOneStrategy |
子Actorがくたばったらそいつだけ何とかする |
AllForOneStrategy |
子Actorがくたばったらすべての子Actorを何とかする |
で、何とかする内容が以下の感じです。
ストラテジ | 意味 |
---|---|
Escalate |
上位のActorに判断を委譲する |
Restart |
子Actorを再起動する |
Resume |
エラーを気にせず続行する |
Stop |
子Actorを停止する |
そして、ストラテジをこんな感じに指定します。
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: IllegalArgumentException ⇒ Stop case _: Exception ⇒ Escalate }
ガーディアンアクター
上記の指定で子がくたばった時の挙動を制御できるようになりました。じゃあ自身がくたばったらどうなるのさ?
自身がほかのActorの子だったらそいつが決めますが、自身がsystem.actorOf
が生成されていた場合は自身の命運はガーディアンアクターが決めます。
Akka 2.1以降では設定ファイルのakka.actor.guardian-supervisor-strategy
でガーディアンアクターのストラテジを設定出来ます。
actor { # Either one of "local", "remote" or "cluster" or the # FQCN of the ActorRefProvider to be used; the below is the built-in default, # note that "remote" and "cluster" requires the akka-remote and akka-cluster # artifacts to be on the classpath. provider = "local" # The guardian "/user" will use this class to obtain its supervisorStrategy. # It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator. # In addition to the default there is akka.actor.StoppingSupervisorStrategy. guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"
現状ではakka.actor.StoppingSupervisorStrategy
かakka.actor.DefaultSupervisorStrategy
が指定できるようです。
おわりに
これならActorがくたばったときにいい感じにActorを再起動できそうです。
これでボーナスを心配することなく安心して今夜も眠ることが出来そうです。
おわり