AkkaでもActorの再起動を遅延したい

はじめに

Actorはエラー発生時に再起動させることが出来ますが、デフォルトの挙動では可能な限りすぐに再起動をさせようとするためDBやWebサービス等の障害が原因ですぐに復旧しない場合はActorの生成→停止を繰り返し死屍累々となることが予想出来ます。

という訳でBackoffSupervisorパターンを使って再起動を遅延させる方法を確認してみました。

Actor

今回テストで使用するActorの実装は以下の感じです。

メッセージとして例外を受け取ったらそれをスローします。

import akka.actor.Actor
import akka.event.Logging

class HogeActor extends Actor {
  val log = Logging(context.system, this)

  override def receive = {
    case e: Exception => {
      throw e
    }
    case message: String => {
      log.info(message)
    }
  }

  override def preStart() = {
    log.info("preStart")
  }
}

BackoffSupervisor

再起動を遅延させるためにはPropsを生成するときにBackoffSupervisorを挟んで生成するようです。

val supervisor = BackoffSupervisor.props(
  Backoff.onFailure(
    Props[HogeActor],
    childName = "hoge",
    minBackoff = 1 seconds,
    maxBackoff = 1 seconds,
    randomFactor = 0.02
  ).withSupervisorStrategy(
    OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minutes) {
      case _: NullPointerException => SupervisorStrategy.Restart
      case _: Exception => SupervisorStrategy.Resume
    }
  )
)

ここでは最小遅延が1秒、最大遅延が1秒でランダムファクターが2%で遅延させます。 ここでのランダムファクターとは、複数のActorが同時に再起動しようとして負荷が跳ね上がるのを防ぐために遅延をランダムに変動させるために使用される値です。

また、SupervisorStrategyとしてNullPointerExceptionの時は再起動、それ以外の時は再開させてます。また、1分以内に5回エラーを送出したら子Actorを停止させます

val hoge = system.actorOf(supervisor, "hoge")

hoge ! "hello"
hoge ! new NullPointerException()
hoge ! "fuga"
hoge ! "fuga"
hoge ! "fuga"

Thread.sleep(1500)

hoge ! new Exception()
hoge ! "piyo"

hoge ! new NullPointerException()
Thread.sleep(1500)
hoge ! new NullPointerException()
Thread.sleep(1500)
hoge ! new NullPointerException()
Thread.sleep(1500)
hoge ! new NullPointerException()
Thread.sleep(1500)
hoge ! new NullPointerException()
Thread.sleep(1500)
hoge ! "fugafuga"
[INFO] [01/08/2018 21:30:36.728] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] preStart
[INFO] [01/08/2018 21:30:36.729] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] hello
[ERROR] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] null
java.lang.NullPointerException
    at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:31)
    at jyuch.Hello$delayedInit$body.apply(Hello.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at jyuch.Hello$.main(Hello.scala:10)
    at jyuch.Hello.main(Hello.scala)

[INFO] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge/hoge#1204086463] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge/hoge#1204086463] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge/hoge#1204086463] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/08/2018 21:30:37.776] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] preStart
[WARN] [01/08/2018 21:30:38.215] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] null
[INFO] [01/08/2018 21:30:38.215] [backoff-akka.actor.default-dispatcher-2] [akka://backoff/user/hoge/hoge] piyo
[ERROR] [01/08/2018 21:30:38.215] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] null
java.lang.NullPointerException
    at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:41)
    at jyuch.Hello$delayedInit$body.apply(Hello.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at jyuch.Hello$.main(Hello.scala:10)
    at jyuch.Hello.main(Hello.scala)

[INFO] [01/08/2018 21:30:39.249] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] preStart
[ERROR] [01/08/2018 21:30:39.724] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] null
java.lang.NullPointerException
    at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:43)
    at jyuch.Hello$delayedInit$body.apply(Hello.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at jyuch.Hello$.main(Hello.scala:10)
    at jyuch.Hello.main(Hello.scala)

[INFO] [01/08/2018 21:30:40.757] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] preStart
[ERROR] [01/08/2018 21:30:41.227] [backoff-akka.actor.default-dispatcher-2] [akka://backoff/user/hoge/hoge] null
java.lang.NullPointerException
    at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:45)
    at jyuch.Hello$delayedInit$body.apply(Hello.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at jyuch.Hello$.main(Hello.scala:10)
    at jyuch.Hello.main(Hello.scala)

[INFO] [01/08/2018 21:30:42.265] [backoff-akka.actor.default-dispatcher-2] [akka://backoff/user/hoge/hoge] preStart
[ERROR] [01/08/2018 21:30:42.733] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] null
java.lang.NullPointerException
    at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:47)
    at jyuch.Hello$delayedInit$body.apply(Hello.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at jyuch.Hello$.main(Hello.scala:10)
    at jyuch.Hello.main(Hello.scala)

[INFO] [01/08/2018 21:30:43.777] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] preStart
[ERROR] [01/08/2018 21:30:44.234] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] null
java.lang.NullPointerException
    at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:49)
    at jyuch.Hello$delayedInit$body.apply(Hello.scala:10)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:389)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at jyuch.Hello$.main(Hello.scala:10)
    at jyuch.Hello.main(Hello.scala)

[INFO] [01/08/2018 21:30:45.741] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge#324524509] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Actorが再起動する前にメッセージボックスに到達したメッセージはdead-letters扱いになるようです。

また、5回エラーを送出された時点でActorの再起動を停止しています。

おわりに

github.com

おわり

VB.NETでもリアクティブしたい(Akka.NET)

はじめに

2018年は殺伐としたVB.NET界隈にAkka.NETの波が到来します。(しません)

という訳であまり需要がなさげなエントリですが*1、ライブラリ自体は結構成熟しているっぽいので実運用に突っ込んでもまぁ行けるんじゃないかなと思います。

Akkaとは

Akkaの概念自体については割と記事があるのでここでは割愛します。

インストールパッケージ

今回の記事で作成したプロジェクトでは以下のパッケージをNuGetでインストールします。

  • Akka(Akka本体)
  • Akka.Logger.Serilog(Serilogのログアダプタ)
  • Serilog(ロガー)
  • Serilog.Sinks.Console(Serilogのコンソールシンクの実装)

今回はロガーとしてSerilogを使用しますが、Serilog自体の説明は省略します。

メッセージとActorの実装

Akka.NETではActorはReceiveActor(もしくはUntypedActor)を継承して実装します。

ReceiveActorではコンストラクタ中で処理するメッセージの型とメッセージを処理するデリゲートを指定します。

Public Sub New(dest As IActorRef)
    _dest = dest
    Receive(Of String)(AddressOf ReceiveString)
    Receive(Of Greet)(AddressOf ReceiveGreet)
End Sub

また、メッセージは(Akkaに限らず)スレッド間を行き来するためイミュータブルになるように実装します。*2

本家のScalaではcase classで所望のクラスが簡単に量産できますが、VBではイミュータブルなクラスの定義がクッソ面倒*3なので弊社のプロダクト環境ではメタプログラミングとT4を駆使してコード生成をして対応しています。が本題ではないのでここでは触れません。

Public Class HelloActor
    Inherits ReceiveActor

    Private ReadOnly _logger As ILoggingAdapter = Context.GetLogger(New SerilogLogMessageFormatter())
    Private ReadOnly _dest As IActorRef

    Public Sub New(dest As IActorRef)
        _dest = dest
        Receive(Of String)(AddressOf ReceiveString)
        Receive(Of Greet)(AddressOf ReceiveGreet)
    End Sub

    Public Sub ReceiveString(msg As String)
        _dest.Tell(New Greet(msg))
    End Sub

    Public Sub ReceiveGreet(msg As Greet)
        _logger.Info("Returned {Message}", msg.Message)
    End Sub

End Class

Public Class EchoActor
    Inherits ReceiveActor

    Private ReadOnly _logger As ILoggingAdapter = Context.GetLogger(New SerilogLogMessageFormatter())

    Public Sub New()
        Receive(Of Greet)(AddressOf ReceiveGreet)
    End Sub

    Public Sub ReceiveGreet(msg As Greet)
        _logger.Info("Receive {Message}", msg.Message)
        Sender.Tell(New Greet($"You said {msg.Message}"))
    End Sub

End Class

Public Class Greet
    Public ReadOnly Property Message As String

    Public Sub New(msg As String)
        Message = msg
    End Sub

End Class

余談ですが、なんで

Receive(Of Greet)(AddressOf ReceiveGreet)

みたいな書き方をしているかなのですが、C#では

Receive<string>(message => {
  log.Info("Received String message: {0}", message);
  Sender.Tell(message);
});

みたいにラムダ式を使ってかなりきれいに書けますが、VBでは

Receive(Of Greet)(Sub(message)
                      _logger.Info("Returned {Message}", message.Message)
                  End Sub)

みたいになってお゛ぉ゛ぉ゛ぉ゛ぉ゛ぉ゛ぉ゛んってなるからです。 特に行数が10行を超えた時点でクッソ読みづらくなります。

ActorSystem

あとは本家と同じようにActorSystemを生成しーのActorを生成しーのトツギーノでリアクティブできます。

Module Program

    Dim conf As String = "
akka {
    loggers=[""Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog""]

    stdout-loglevel = off
    loglevel = DEBUG
    log-config-on-start = on
    actor {
        debug {
              receive = on 
              autoreceive = on
              lifecycle = on
              event-stream = on
              unhandled = on
        }
    }
}
"

    Sub Main()
        Dim logger = New LoggerConfiguration().
            WriteTo.Console().
            MinimumLevel.Debug().
            CreateLogger()
        Log.Logger = logger

        Dim system = ActorSystem.Create("vb-actor", conf)
        Dim echo = system.ActorOf(Of EchoActor)("echo")
        Dim hello = system.ActorOf(Props.Create(Function() New HelloActor(echo)))

        hello.Tell("hello")
        hello.Tell("hoge")

        Console.ReadLine()
        system.Terminate().Wait()
    End Sub

End Module
[23:09:14 INF] Receive hello
[23:09:14 INF] Receive hoge
[23:09:14 INF] Returned You said hello
[23:09:14 INF] Returned You said hoge

おわりに

Akka.NETはいかんせん日本語記事がほとんど存在しないので割と茨の道ですが、英語の記事自体は結構あるうえに本家のドキュメントも脳内で.NET向けにトランスパイルすれば割とそのまま適用できるので思ったよりは障壁は低いと思います。

なお、IntelliSenseの説明文がときたま「TBD」になっており、『はぁそうですか』ってなります。

f:id:jyuch:20180103233642p:plain

github.com

おわり

*1:執筆時点ではC#ですら日本語でヒットする記事は1件、VBに至っては英語ですらヒットしない

*2:マルチスレッドの大原則として不変な値は常にスレッドセーフというものがあります

*3:特にVB14以前

Akkaでもテストを実施したい

はじめに

あけましておめでとうございます

今年の目標も前年と同様に「生き抜く」を主軸としてなんかいい感じに頑張ります。

今回はAkkaのテストについてです。

今回の内容はほとんどAkka実践バイブル(翔泳社 2017年)から参考にさせて頂いた*1のですが、ActorSystemのシャットダウン方法が2.4でdeprecated入りしたActorSystem.shutdownを使っている所に若干の不安を抱きますがまぁ英語版の書籍が出て日本語訳が出るまでにタイムラグが発生すんだからまぁ仕方ないよねと言った感じです。

最終的に何が言いたいのかというと、素晴らしい本をありがとうございます。

概要

Akka(というよりはアクターモデル)ではActorにメッセージを送信し、メッセージを受け取ったActorは自身の内部状態を変化させたり受信したメッセージをそのままもしくは改変して別のActorに送信します。(データベースやKafkaのようなメッセージシステムに出力する場合もありますが、今回はその辺はやりません。) 逆に言うとActorはメッセージを送らない限り何もしません。

そこで、テスト用のActorSystemを作成し、そのActorSystem内でActorにメッセージを送信し返信メッセージもしくは内部状態を捕捉してAssertするのが基本戦略になるっぽいです。

f:id:jyuch:20180101201509p:plain

ヘルパトレイト

ここではテストケースが終了したタイミングをフックしてActorSystemを終了するトレイトをあらかじめ定義しておきます。

import org.scalatest.{Suite, BeforeAndAfterAll}
import akka.testkit.TestKit

trait StopSystemAfterAll extends BeforeAndAfterAll {
  this :TestKit with Suite =>

  override protected def afterAll(): Unit = {
    super.afterAll()
    system.terminate()
  }
}

内部状態のアサート

ここではSumActorという、受信した値の合計をひたすら加算するActorのテストを想定します。

import akka.actor.{Actor, ActorRef}
import jyuch.SumActor.{GetSum, Operand}

class SumActor extends Actor {
  var sum: Int = 0

  override def receive: Receive = {
    case Operand(value) => {
      sum += value
    }
    case GetSum(sender) => sender ! sum
  }

  def currentSum = sum
}

object SumActor {

  case class Operand(value: Int)

  case class GetSum(sender: ActorRef)

}
import akka.actor.{ActorSystem, Props}
import akka.testkit.{TestActorRef, TestKit}
import jyuch.SumActor.{GetSum, Operand}
import org.scalatest.{MustMatchers, WordSpecLike}

class SumActorTest extends TestKit(ActorSystem("testsystem"))
  with WordSpecLike
  with MustMatchers
  with StopSystemAfterAll {

  "SumActor" must {
    "change internal value when receives a message" in {
      val sumActor = TestActorRef[SumActor]
      sumActor ! Operand(10)
      sumActor.underlyingActor.sum must be (10)
    }

    "change internal value when receives a message, multi" in {
      val sumActor = system.actorOf(Props[SumActor])
      sumActor ! Operand(10)
      sumActor ! Operand(20)
      sumActor ! GetSum(testActor)
      expectMsg(30)
    }
  }
}

内部状態の直接参照

TestActorRefでActorをラップすると不思議な力でActorの内部に触れるようになるので*2、メッセージを送信した後目的の内部状態になっているかをアサートできるようになります。

val sumActor = TestActorRef[SumActor]
sumActor ! Operand(10)
sumActor.underlyingActor.sum must be (10)

メッセージ経由での内部状態の参照

case GetSum(sender) => sender ! sumのようにActorが処理するメッセージの中に内部状態をエクスポートするハンドラを用意することでメッセージ経由で内部状態をアサートできるようになります。

val sumActor = system.actorOf(Props[SumActor])
sumActor ! Operand(10)
sumActor ! Operand(20)
sumActor ! GetSum(testActor)
expectMsg(30)

どちらの方法でもActorに本来必要のないロジックを追加することになるのでなんだかなぁという感じになりますが、まぁこの辺は妥協が必要なところってことでしょうか。

メッセージのアサート

ここでは特定の条件でメッセージをフィルタしつつ送信者に送り返すActorのテストを想定します。

import akka.actor.Actor
import jyuch.FilterEchoActor.Message

class FilterEchoActor(filter: String) extends Actor {
  override def receive: Receive = {
    case Message(msg) => {
      if (msg.startsWith(filter)) {
        sender() ! Message(msg)
      }
    }
  }
}

object FilterEchoActor {

  def props(filter: String) = {
    akka.actor.Props(new FilterEchoActor(filter))
  }

  case class Message(msg: String)

}
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import jyuch.FilterEchoActor.Message
import org.scalatest.{MustMatchers, WordSpecLike}

class FilterEchoActorTest extends TestKit(ActorSystem("testsystem"))
  with WordSpecLike
  with MustMatchers
  with ImplicitSender
  with StopSystemAfterAll {

  "FilterEchoActor" must {
    "return message when message starts with provided string" in {
      val echoActor = system.actorOf(FilterEchoActor.props("Hello"))

      echoActor ! Message("Hello world")
      echoActor ! Message("Hello jyuch")
      echoActor ! Message("Hoge world")
      echoActor ! Message("Hoge jyuch")
      echoActor ! Message("Hello world")

      val msg = receiveN(3)

      msg must be(Vector(Message("Hello world"), Message("Hello jyuch"), Message("Hello world")))
    }
  }
}

ImplicitSenderトレイトをmix-inするとメッセージの送信者を暗黙的にtestActorにしてくれます。

フィルタの条件を指定してActorを生成しメッセージを送信し、送り返されたメッセージが望んだ結果かをアサートしています。

おわりに

余談ってほど余談ではないのですが、Actorにコンストラクタ引数で何かしらのパラメータを渡したいときはコンパニオンオブジェクトにファクトリメソッドを定義する方法が推奨されているようです。

また、この記事を書いているときに思いついたのですが、今年の目標に「しょうもない小ネタを挟むために時間を使わない」も付け加えておきます。

元旦

github.com

*1:このブログを読むよりそちらを買って読んだほうがいいです

*2:ついでに不思議な力でシングルスレッドで動作するようになる