Akkaでもテストを実施したい

はじめに

あけましておめでとうございます

今年の目標も前年と同様に「生き抜く」を主軸としてなんかいい感じに頑張ります。

今回はAkkaのテストについてです。

今回の内容はほとんどAkka実践バイブル(翔泳社 2017年)から参考にさせて頂いた*1のですが、ActorSystemのシャットダウン方法が2.4でdeprecated入りしたActorSystem.shutdownを使っている所に若干の不安を抱きますがまぁ英語版の書籍が出て日本語訳が出るまでにタイムラグが発生すんだからまぁ仕方ないよねと言った感じです。

最終的に何が言いたいのかというと、素晴らしい本をありがとうございます。

概要

Akka(というよりはアクターモデル)ではActorにメッセージを送信し、メッセージを受け取ったActorは自身の内部状態を変化させたり受信したメッセージをそのままもしくは改変して別のActorに送信します。(データベースやKafkaのようなメッセージシステムに出力する場合もありますが、今回はその辺はやりません。) 逆に言うとActorはメッセージを送らない限り何もしません。

そこで、テスト用のActorSystemを作成し、そのActorSystem内でActorにメッセージを送信し返信メッセージもしくは内部状態を捕捉してAssertするのが基本戦略になるっぽいです。

f:id:jyuch:20180101201509p:plain

ヘルパトレイト

ここではテストケースが終了したタイミングをフックしてActorSystemを終了するトレイトをあらかじめ定義しておきます。

import org.scalatest.{Suite, BeforeAndAfterAll}
import akka.testkit.TestKit

trait StopSystemAfterAll extends BeforeAndAfterAll {
  this :TestKit with Suite =>

  override protected def afterAll(): Unit = {
    super.afterAll()
    system.terminate()
  }
}

内部状態のアサート

ここではSumActorという、受信した値の合計をひたすら加算するActorのテストを想定します。

import akka.actor.{Actor, ActorRef}
import jyuch.SumActor.{GetSum, Operand}

class SumActor extends Actor {
  var sum: Int = 0

  override def receive: Receive = {
    case Operand(value) => {
      sum += value
    }
    case GetSum(sender) => sender ! sum
  }

  def currentSum = sum
}

object SumActor {

  case class Operand(value: Int)

  case class GetSum(sender: ActorRef)

}
import akka.actor.{ActorSystem, Props}
import akka.testkit.{TestActorRef, TestKit}
import jyuch.SumActor.{GetSum, Operand}
import org.scalatest.{MustMatchers, WordSpecLike}

class SumActorTest extends TestKit(ActorSystem("testsystem"))
  with WordSpecLike
  with MustMatchers
  with StopSystemAfterAll {

  "SumActor" must {
    "change internal value when receives a message" in {
      val sumActor = TestActorRef[SumActor]
      sumActor ! Operand(10)
      sumActor.underlyingActor.sum must be (10)
    }

    "change internal value when receives a message, multi" in {
      val sumActor = system.actorOf(Props[SumActor])
      sumActor ! Operand(10)
      sumActor ! Operand(20)
      sumActor ! GetSum(testActor)
      expectMsg(30)
    }
  }
}

内部状態の直接参照

TestActorRefでActorをラップすると不思議な力でActorの内部に触れるようになるので*2、メッセージを送信した後目的の内部状態になっているかをアサートできるようになります。

val sumActor = TestActorRef[SumActor]
sumActor ! Operand(10)
sumActor.underlyingActor.sum must be (10)

メッセージ経由での内部状態の参照

case GetSum(sender) => sender ! sumのようにActorが処理するメッセージの中に内部状態をエクスポートするハンドラを用意することでメッセージ経由で内部状態をアサートできるようになります。

val sumActor = system.actorOf(Props[SumActor])
sumActor ! Operand(10)
sumActor ! Operand(20)
sumActor ! GetSum(testActor)
expectMsg(30)

どちらの方法でもActorに本来必要のないロジックを追加することになるのでなんだかなぁという感じになりますが、まぁこの辺は妥協が必要なところってことでしょうか。

メッセージのアサート

ここでは特定の条件でメッセージをフィルタしつつ送信者に送り返すActorのテストを想定します。

import akka.actor.Actor
import jyuch.FilterEchoActor.Message

class FilterEchoActor(filter: String) extends Actor {
  override def receive: Receive = {
    case Message(msg) => {
      if (msg.startsWith(filter)) {
        sender() ! Message(msg)
      }
    }
  }
}

object FilterEchoActor {

  def props(filter: String) = {
    akka.actor.Props(new FilterEchoActor(filter))
  }

  case class Message(msg: String)

}
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import jyuch.FilterEchoActor.Message
import org.scalatest.{MustMatchers, WordSpecLike}

class FilterEchoActorTest extends TestKit(ActorSystem("testsystem"))
  with WordSpecLike
  with MustMatchers
  with ImplicitSender
  with StopSystemAfterAll {

  "FilterEchoActor" must {
    "return message when message starts with provided string" in {
      val echoActor = system.actorOf(FilterEchoActor.props("Hello"))

      echoActor ! Message("Hello world")
      echoActor ! Message("Hello jyuch")
      echoActor ! Message("Hoge world")
      echoActor ! Message("Hoge jyuch")
      echoActor ! Message("Hello world")

      val msg = receiveN(3)

      msg must be(Vector(Message("Hello world"), Message("Hello jyuch"), Message("Hello world")))
    }
  }
}

ImplicitSenderトレイトをmix-inするとメッセージの送信者を暗黙的にtestActorにしてくれます。

フィルタの条件を指定してActorを生成しメッセージを送信し、送り返されたメッセージが望んだ結果かをアサートしています。

おわりに

余談ってほど余談ではないのですが、Actorにコンストラクタ引数で何かしらのパラメータを渡したいときはコンパニオンオブジェクトにファクトリメソッドを定義する方法が推奨されているようです。

また、この記事を書いているときに思いついたのですが、今年の目標に「しょうもない小ネタを挟むために時間を使わない」も付け加えておきます。

元旦

github.com

*1:このブログを読むよりそちらを買って読んだほうがいいです

*2:ついでに不思議な力でシングルスレッドで動作するようになる

Scalaでも値が範囲に入っているか確認したい

はじめにとおわりに

Scalaで整数が特定の範囲に入っているかを宣言的に書けないかなと思ったのですが、やっぱり書けました。

val range = 0 to 255
println(range contains -1)
println(range contains 0)
println(range contains 20)
println(range contains 255)
println(range contains 256)

val range2 = 0 until 255
println(range2 contains 254)
println(range2 contains 255)
false
true
true
true
false
true
false

これだけです。

おわり

Akkaのエラーハンドリングストラテジを確認したい

はじめに

Akkaを実運用に突っ込もうと考えたときに一番最初に気になるのはActorがくたばった時の挙動ですよね。 知らない間にActorがくたばっていてメッセージだけが虚空の彼方に消えていったなんて事態になった日には次回の自身のボーナスも虚空に消えかねません。

ボーナスを死守するためにもエラーハンドリングは必須の知識として学ぶべきです。

サンプルコード

今回はこんな感じのサンプルコードを使用します。

package org.jyuch

import akka.actor.SupervisorStrategy.{Escalate, Restart, Resume, Stop}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}

import scala.concurrent.duration._

import scala.language.postfixOps

object Hello {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem()
    val calc = system.actorOf(CalcActor.props())

    calc ! Add(1, 2)
    calc ! Add(-1, 2)
    calc ! Add(1, 2)
    calc ! Subtract(3, 2)

    io.StdIn.readLine()
    system.terminate()
  }
}

trait OperandActor extends Actor {
  override def postRestart(reason: Throwable): Unit = {
    super.postRestart(reason)
    //println(reason.getClass.getTypeName)
  }
}

class CalcActor extends OperandActor {
  val add = context.actorOf(AddActor.props(), "add")
  val subtract = context.actorOf(SubtractActor.props(), "subtract")

  def receive = {
    case Result(v) =>{
      println(this + " : " + v)
    }
    case a: Add => add ! a
    case s: Subtract => subtract ! s
  }

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: Exception => Stop
    }
}

object CalcActor {
  def props(): Props = Props[CalcActor]
}

class AddActor extends OperandActor {
  def receive = {
    case Add(x, y) => {
      println(this)
      if (x < 0 || y < 0) throw new Exception()
      sender() ! Result(x + y)
    }
  }
}

object AddActor {
  def props() = Props[AddActor]
}

class SubtractActor extends OperandActor {
  def receive = {
    case Subtract(x, y) => {
      println(this)
      if (x - y < 0) throw new Exception()
      sender() ! Result(x - y)
    }
  }
}

object SubtractActor {
  def props() = Props[SubtractActor]
}

sealed abstract class Operand

case class Add(x: Int, y: Int) extends Operand

case class Subtract(x: Int, y: Int) extends Operand

case class Result(value: Int)

子Actorの後始末

エラーハンドリングストラテジは親ActorのsupervisorStrategyで制御されますが、何も指定しない(デフォルト)とくたばったActorだけを再起動します。

supervisorStrategyで指定できるストラテジは以下の感じになります。

ストラテジ 意味
OneForOneStrategy 子Actorがくたばったらそいつだけ何とかする
AllForOneStrategy 子Actorがくたばったらすべての子Actorを何とかする

で、何とかする内容が以下の感じです。

ストラテジ 意味
Escalate 上位のActorに判断を委譲する
Restart 子Actorを再起動する
Resume エラーを気にせず続行する
Stop 子Actorを停止する

そして、ストラテジをこんな感じに指定します。

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException      ⇒ Resume
    case _: NullPointerException     ⇒ Restart
    case _: IllegalArgumentException ⇒ Stop
    case _: Exception                ⇒ Escalate
  }

ガーディアンアクター

上記の指定で子がくたばった時の挙動を制御できるようになりました。じゃあ自身がくたばったらどうなるのさ?

自身がほかのActorの子だったらそいつが決めますが、自身がsystem.actorOfが生成されていた場合は自身の命運はガーディアンアクターが決めます。

doc.akka.io

Akka 2.1以降では設定ファイルのakka.actor.guardian-supervisor-strategyでガーディアンアクターのストラテジを設定出来ます。

doc.akka.io

  actor {

    # Either one of "local", "remote" or "cluster" or the
    # FQCN of the ActorRefProvider to be used; the below is the built-in default,
    # note that "remote" and "cluster" requires the akka-remote and akka-cluster
    # artifacts to be on the classpath.
    provider = "local"

    # The guardian "/user" will use this class to obtain its supervisorStrategy.
    # It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator.
    # In addition to the default there is akka.actor.StoppingSupervisorStrategy.
    guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"

現状ではakka.actor.StoppingSupervisorStrategyakka.actor.DefaultSupervisorStrategyが指定できるようです。

おわりに

これならActorがくたばったときにいい感じにActorを再起動できそうです。

これでボーナスを心配することなく安心して今夜も眠ることが出来そうです。

おわり

github.com