AkkaでもActorの再起動を遅延したい
はじめに
Actorはエラー発生時に再起動させることが出来ますが、デフォルトの挙動では可能な限りすぐに再起動をさせようとするためDBやWebサービス等の障害が原因ですぐに復旧しない場合はActorの生成→停止を繰り返し死屍累々となることが予想出来ます。
という訳でBackoffSupervisorパターンを使って再起動を遅延させる方法を確認してみました。
Actor
今回テストで使用するActorの実装は以下の感じです。
メッセージとして例外を受け取ったらそれをスローします。
import akka.actor.Actor import akka.event.Logging class HogeActor extends Actor { val log = Logging(context.system, this) override def receive = { case e: Exception => { throw e } case message: String => { log.info(message) } } override def preStart() = { log.info("preStart") } }
BackoffSupervisor
再起動を遅延させるためにはProps
を生成するときにBackoffSupervisor
を挟んで生成するようです。
val supervisor = BackoffSupervisor.props( Backoff.onFailure( Props[HogeActor], childName = "hoge", minBackoff = 1 seconds, maxBackoff = 1 seconds, randomFactor = 0.02 ).withSupervisorStrategy( OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minutes) { case _: NullPointerException => SupervisorStrategy.Restart case _: Exception => SupervisorStrategy.Resume } ) )
ここでは最小遅延が1秒、最大遅延が1秒でランダムファクターが2%で遅延させます。 ここでのランダムファクターとは、複数のActorが同時に再起動しようとして負荷が跳ね上がるのを防ぐために遅延をランダムに変動させるために使用される値です。
また、SupervisorStrategy
としてNullPointerException
の時は再起動、それ以外の時は再開させてます。また、1分以内に5回エラーを送出したら子Actorを停止させます。
val hoge = system.actorOf(supervisor, "hoge") hoge ! "hello" hoge ! new NullPointerException() hoge ! "fuga" hoge ! "fuga" hoge ! "fuga" Thread.sleep(1500) hoge ! new Exception() hoge ! "piyo" hoge ! new NullPointerException() Thread.sleep(1500) hoge ! new NullPointerException() Thread.sleep(1500) hoge ! new NullPointerException() Thread.sleep(1500) hoge ! new NullPointerException() Thread.sleep(1500) hoge ! new NullPointerException() Thread.sleep(1500) hoge ! "fugafuga"
[INFO] [01/08/2018 21:30:36.728] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] preStart [INFO] [01/08/2018 21:30:36.729] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] hello [ERROR] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] null java.lang.NullPointerException at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:31) at jyuch.Hello$delayedInit$body.apply(Hello.scala:10) at scala.Function0.apply$mcV$sp(Function0.scala:34) at scala.Function0.apply$mcV$sp$(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:389) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at jyuch.Hello$.main(Hello.scala:10) at jyuch.Hello.main(Hello.scala) [INFO] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge/hoge#1204086463] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge/hoge#1204086463] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [01/08/2018 21:30:36.740] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge/hoge#1204086463] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [01/08/2018 21:30:37.776] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] preStart [WARN] [01/08/2018 21:30:38.215] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] null [INFO] [01/08/2018 21:30:38.215] [backoff-akka.actor.default-dispatcher-2] [akka://backoff/user/hoge/hoge] piyo [ERROR] [01/08/2018 21:30:38.215] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] null java.lang.NullPointerException at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:41) at jyuch.Hello$delayedInit$body.apply(Hello.scala:10) at scala.Function0.apply$mcV$sp(Function0.scala:34) at scala.Function0.apply$mcV$sp$(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:389) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at jyuch.Hello$.main(Hello.scala:10) at jyuch.Hello.main(Hello.scala) [INFO] [01/08/2018 21:30:39.249] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] preStart [ERROR] [01/08/2018 21:30:39.724] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] null java.lang.NullPointerException at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:43) at jyuch.Hello$delayedInit$body.apply(Hello.scala:10) at scala.Function0.apply$mcV$sp(Function0.scala:34) at scala.Function0.apply$mcV$sp$(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:389) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at jyuch.Hello$.main(Hello.scala:10) at jyuch.Hello.main(Hello.scala) [INFO] [01/08/2018 21:30:40.757] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] preStart [ERROR] [01/08/2018 21:30:41.227] [backoff-akka.actor.default-dispatcher-2] [akka://backoff/user/hoge/hoge] null java.lang.NullPointerException at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:45) at jyuch.Hello$delayedInit$body.apply(Hello.scala:10) at scala.Function0.apply$mcV$sp(Function0.scala:34) at scala.Function0.apply$mcV$sp$(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:389) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at jyuch.Hello$.main(Hello.scala:10) at jyuch.Hello.main(Hello.scala) [INFO] [01/08/2018 21:30:42.265] [backoff-akka.actor.default-dispatcher-2] [akka://backoff/user/hoge/hoge] preStart [ERROR] [01/08/2018 21:30:42.733] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] null java.lang.NullPointerException at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:47) at jyuch.Hello$delayedInit$body.apply(Hello.scala:10) at scala.Function0.apply$mcV$sp(Function0.scala:34) at scala.Function0.apply$mcV$sp$(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:389) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at jyuch.Hello$.main(Hello.scala:10) at jyuch.Hello.main(Hello.scala) [INFO] [01/08/2018 21:30:43.777] [backoff-akka.actor.default-dispatcher-3] [akka://backoff/user/hoge/hoge] preStart [ERROR] [01/08/2018 21:30:44.234] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge/hoge] null java.lang.NullPointerException at jyuch.Hello$.delayedEndpoint$jyuch$Hello$1(Hello.scala:49) at jyuch.Hello$delayedInit$body.apply(Hello.scala:10) at scala.Function0.apply$mcV$sp(Function0.scala:34) at scala.Function0.apply$mcV$sp$(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App.$anonfun$main$1$adapted(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:389) at scala.App.main(App.scala:76) at scala.App.main$(App.scala:74) at jyuch.Hello$.main(Hello.scala:10) at jyuch.Hello.main(Hello.scala) [INFO] [01/08/2018 21:30:45.741] [backoff-akka.actor.default-dispatcher-5] [akka://backoff/user/hoge] Message [java.lang.String] without sender to Actor[akka://backoff/user/hoge#324524509] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Actorが再起動する前にメッセージボックスに到達したメッセージはdead-letters扱いになるようです。
また、5回エラーを送出された時点でActorの再起動を停止しています。
おわりに
おわり
VB.NETでもリアクティブしたい(Akka.NET)
はじめに
2018年は殺伐としたVB.NET界隈にAkka.NETの波が到来します。(しません)
という訳であまり需要がなさげなエントリですが*1、ライブラリ自体は結構成熟しているっぽいので実運用に突っ込んでもまぁ行けるんじゃないかなと思います。
Akkaとは
Akkaの概念自体については割と記事があるのでここでは割愛します。
インストールパッケージ
今回の記事で作成したプロジェクトでは以下のパッケージをNuGetでインストールします。
- Akka(Akka本体)
- Akka.Logger.Serilog(Serilogのログアダプタ)
- Serilog(ロガー)
- Serilog.Sinks.Console(Serilogのコンソールシンクの実装)
今回はロガーとしてSerilogを使用しますが、Serilog自体の説明は省略します。
メッセージとActorの実装
Akka.NETではActorはReceiveActor
(もしくはUntypedActor
)を継承して実装します。
ReceiveActor
ではコンストラクタ中で処理するメッセージの型とメッセージを処理するデリゲートを指定します。
Public Sub New(dest As IActorRef) _dest = dest Receive(Of String)(AddressOf ReceiveString) Receive(Of Greet)(AddressOf ReceiveGreet) End Sub
また、メッセージは(Akkaに限らず)スレッド間を行き来するためイミュータブルになるように実装します。*2
本家のScalaではcase class
で所望のクラスが簡単に量産できますが、VBではイミュータブルなクラスの定義がクッソ面倒*3なので弊社のプロダクト環境ではメタプログラミングとT4を駆使してコード生成をして対応しています。が本題ではないのでここでは触れません。
Public Class HelloActor Inherits ReceiveActor Private ReadOnly _logger As ILoggingAdapter = Context.GetLogger(New SerilogLogMessageFormatter()) Private ReadOnly _dest As IActorRef Public Sub New(dest As IActorRef) _dest = dest Receive(Of String)(AddressOf ReceiveString) Receive(Of Greet)(AddressOf ReceiveGreet) End Sub Public Sub ReceiveString(msg As String) _dest.Tell(New Greet(msg)) End Sub Public Sub ReceiveGreet(msg As Greet) _logger.Info("Returned {Message}", msg.Message) End Sub End Class Public Class EchoActor Inherits ReceiveActor Private ReadOnly _logger As ILoggingAdapter = Context.GetLogger(New SerilogLogMessageFormatter()) Public Sub New() Receive(Of Greet)(AddressOf ReceiveGreet) End Sub Public Sub ReceiveGreet(msg As Greet) _logger.Info("Receive {Message}", msg.Message) Sender.Tell(New Greet($"You said {msg.Message}")) End Sub End Class Public Class Greet Public ReadOnly Property Message As String Public Sub New(msg As String) Message = msg End Sub End Class
余談ですが、なんで
Receive(Of Greet)(AddressOf ReceiveGreet)
みたいな書き方をしているかなのですが、C#では
Receive<string>(message => { log.Info("Received String message: {0}", message); Sender.Tell(message); });
Receive(Of Greet)(Sub(message) _logger.Info("Returned {Message}", message.Message) End Sub)
みたいになってお゛ぉ゛ぉ゛ぉ゛ぉ゛ぉ゛ぉ゛んってなるからです。 特に行数が10行を超えた時点でクッソ読みづらくなります。
ActorSystem
あとは本家と同じようにActorSystem
を生成しーのActor
を生成しーのトツギーノでリアクティブできます。
Module Program Dim conf As String = " akka { loggers=[""Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog""] stdout-loglevel = off loglevel = DEBUG log-config-on-start = on actor { debug { receive = on autoreceive = on lifecycle = on event-stream = on unhandled = on } } } " Sub Main() Dim logger = New LoggerConfiguration(). WriteTo.Console(). MinimumLevel.Debug(). CreateLogger() Log.Logger = logger Dim system = ActorSystem.Create("vb-actor", conf) Dim echo = system.ActorOf(Of EchoActor)("echo") Dim hello = system.ActorOf(Props.Create(Function() New HelloActor(echo))) hello.Tell("hello") hello.Tell("hoge") Console.ReadLine() system.Terminate().Wait() End Sub End Module
[23:09:14 INF] Receive hello [23:09:14 INF] Receive hoge [23:09:14 INF] Returned You said hello [23:09:14 INF] Returned You said hoge
おわりに
Akka.NETはいかんせん日本語記事がほとんど存在しないので割と茨の道ですが、英語の記事自体は結構あるうえに本家のドキュメントも脳内で.NET向けにトランスパイルすれば割とそのまま適用できるので思ったよりは障壁は低いと思います。
なお、IntelliSenseの説明文がときたま「TBD」になっており、『はぁそうですか』ってなります。
おわり
Akkaでもテストを実施したい
はじめに
あけましておめでとうございます
今年の目標も前年と同様に「生き抜く」を主軸としてなんかいい感じに頑張ります。
今回はAkkaのテストについてです。
今回の内容はほとんどAkka実践バイブル(翔泳社 2017年)から参考にさせて頂いた*1のですが、ActorSystem
のシャットダウン方法が2.4でdeprecated
入りしたActorSystem.shutdown
を使っている所に若干の不安を抱きますがまぁ英語版の書籍が出て日本語訳が出るまでにタイムラグが発生すんだからまぁ仕方ないよねと言った感じです。
最終的に何が言いたいのかというと、素晴らしい本をありがとうございます。
概要
Akka(というよりはアクターモデル)ではActorにメッセージを送信し、メッセージを受け取ったActorは自身の内部状態を変化させたり受信したメッセージをそのままもしくは改変して別のActorに送信します。(データベースやKafkaのようなメッセージシステムに出力する場合もありますが、今回はその辺はやりません。) 逆に言うとActorはメッセージを送らない限り何もしません。
そこで、テスト用のActorSystem
を作成し、そのActorSystem
内でActorにメッセージを送信し返信メッセージもしくは内部状態を捕捉してAssertするのが基本戦略になるっぽいです。
ヘルパトレイト
ここではテストケースが終了したタイミングをフックしてActorSystem
を終了するトレイトをあらかじめ定義しておきます。
import org.scalatest.{Suite, BeforeAndAfterAll} import akka.testkit.TestKit trait StopSystemAfterAll extends BeforeAndAfterAll { this :TestKit with Suite => override protected def afterAll(): Unit = { super.afterAll() system.terminate() } }
内部状態のアサート
ここではSumActor
という、受信した値の合計をひたすら加算するActorのテストを想定します。
import akka.actor.{Actor, ActorRef} import jyuch.SumActor.{GetSum, Operand} class SumActor extends Actor { var sum: Int = 0 override def receive: Receive = { case Operand(value) => { sum += value } case GetSum(sender) => sender ! sum } def currentSum = sum } object SumActor { case class Operand(value: Int) case class GetSum(sender: ActorRef) }
import akka.actor.{ActorSystem, Props} import akka.testkit.{TestActorRef, TestKit} import jyuch.SumActor.{GetSum, Operand} import org.scalatest.{MustMatchers, WordSpecLike} class SumActorTest extends TestKit(ActorSystem("testsystem")) with WordSpecLike with MustMatchers with StopSystemAfterAll { "SumActor" must { "change internal value when receives a message" in { val sumActor = TestActorRef[SumActor] sumActor ! Operand(10) sumActor.underlyingActor.sum must be (10) } "change internal value when receives a message, multi" in { val sumActor = system.actorOf(Props[SumActor]) sumActor ! Operand(10) sumActor ! Operand(20) sumActor ! GetSum(testActor) expectMsg(30) } } }
内部状態の直接参照
TestActorRef
でActorをラップすると不思議な力でActorの内部に触れるようになるので*2、メッセージを送信した後目的の内部状態になっているかをアサートできるようになります。
val sumActor = TestActorRef[SumActor] sumActor ! Operand(10) sumActor.underlyingActor.sum must be (10)
メッセージ経由での内部状態の参照
case GetSum(sender) => sender ! sum
のようにActorが処理するメッセージの中に内部状態をエクスポートするハンドラを用意することでメッセージ経由で内部状態をアサートできるようになります。
val sumActor = system.actorOf(Props[SumActor]) sumActor ! Operand(10) sumActor ! Operand(20) sumActor ! GetSum(testActor) expectMsg(30)
どちらの方法でもActorに本来必要のないロジックを追加することになるのでなんだかなぁという感じになりますが、まぁこの辺は妥協が必要なところってことでしょうか。
メッセージのアサート
ここでは特定の条件でメッセージをフィルタしつつ送信者に送り返すActorのテストを想定します。
import akka.actor.Actor import jyuch.FilterEchoActor.Message class FilterEchoActor(filter: String) extends Actor { override def receive: Receive = { case Message(msg) => { if (msg.startsWith(filter)) { sender() ! Message(msg) } } } } object FilterEchoActor { def props(filter: String) = { akka.actor.Props(new FilterEchoActor(filter)) } case class Message(msg: String) }
import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import jyuch.FilterEchoActor.Message import org.scalatest.{MustMatchers, WordSpecLike} class FilterEchoActorTest extends TestKit(ActorSystem("testsystem")) with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll { "FilterEchoActor" must { "return message when message starts with provided string" in { val echoActor = system.actorOf(FilterEchoActor.props("Hello")) echoActor ! Message("Hello world") echoActor ! Message("Hello jyuch") echoActor ! Message("Hoge world") echoActor ! Message("Hoge jyuch") echoActor ! Message("Hello world") val msg = receiveN(3) msg must be(Vector(Message("Hello world"), Message("Hello jyuch"), Message("Hello world"))) } } }
ImplicitSender
トレイトをmix-inするとメッセージの送信者を暗黙的にtestActor
にしてくれます。
フィルタの条件を指定してActorを生成しメッセージを送信し、送り返されたメッセージが望んだ結果かをアサートしています。
おわりに
余談ってほど余談ではないのですが、Actorにコンストラクタ引数で何かしらのパラメータを渡したいときはコンパニオンオブジェクトにファクトリメソッドを定義する方法が推奨されているようです。
また、この記事を書いているときに思いついたのですが、今年の目標に「しょうもない小ネタを挟むために時間を使わない」も付け加えておきます。
元旦