Akka.NET Streamsを流行らせるとまではいかなくても、それなりに知名度を上げたい
はじめに
このブログで過去何度かAkka.NET Streamsネタを取り上げますが、日本語でのAkka.NET Streamsの記事って本ブログを含めても両手で収まっちゃうんじゃないかなってくらい少ないんですよね。 記事が少ないと知るきっかけも少なくなり、興味を持つ人が少なくなって記事も少ないままという悪循環になります。
そんな日本では割と無名なAkka.NET Streams、しいてはAkka.NETについて興味を持って頂くきっかけになればと思います。
逆にReactiveってキーワードがかぶっているReactive Extensions for .NET(Rx)はすごく流行っており、その人気っぷりに嫉妬せざる負えません。
そもそもAkka.NET Streamsって?
Lightbend社が開発しているアクターモデルを使用したJVM向け*1の並行・分散処理ライブラリにAkkaというものがあり、それの.NETへポーティングしたのがAkka.NETです。 そして、Akka.NETを使用したReactive Streamsの実装がAkka.NET Streamsになります。
アクターモデルとは、それぞれメールボックスと呼ばれるメッセージを受け取る機構を備えた独立した計算実体であるアクターがお互いに非同期にメッセージの送受信を行い、受け取ったメッセージに応じて処理を行う事でアクターが並行に動作するモデルだそうです。
アクターがメッセージを送受信して動作するという直観的にわかりやすいメンタルモデルに反して、それを具体的な業務システムに落とし込もうとすると既存の設計手法が全く使えない。というのが理由かどうかは知りませんが、Akka.NETは.NET界隈では死ぬほど流行っていないです。
Reactive Streamsの説明については竹添さんの記事が分かりやすいです。
Akka.NET Streamsは裏でアクターモデルを使用しているとはいえアクターを常に意識しながら書くわけではなく、普通のC#を書く感覚の延長で使用可能なのでまぁまぁ使いやすいです。
さっき言っていたRxとの違いは?
弊社はRxを使ったことがないので正直わかりません(おい
厳密な説明ではないのですが、Akka.NET Streamsはバックプレッシャーを効かせてメモリに乗り切らないサイズのデータを非同期を駆使してCPUの全コアを天井に張り付かせつつHDDのランプを常にちっかちかさせながらあびゃーって処理するためのライブラリです。対してRxは画面とかのUIイベントをストリームとして操作することに興味があるようです。*2
ということで根本的に適用範囲が異なります。
ちなみにJVMのほうはインターフェースが統一されておりRxJavaや本家Akka Streamsとの間で相互運用が可能なようですが、RxとAkka.NET Streamsとの間は無理っぽいです。ざんねん
でも.NETにもIEnumerable
とTask
があるじゃん それらを使えば非同期かつストリームに処理できるのでは?
C#は言語レベルで非同期処理をサポートしているため、単純なケースならほとんどの場合で何とかなるんじゃないでしょうか。 しかし、そこまで単純なケースはあまり多くはないでしょう。
static async Task<int> VeryHeavyProcess(int i) { await Task.Delay(1000); // 重い処理の代わり return i * 2; } static async Task<int> UseEnumerable() { var sum = 0; foreach (var i in Enumerable.Range(0, 100)) { sum += await VeryHeavyProcess(i); // (1) } return sum; } static Task<int> UseStreams(ActorMaterializer materializer) { Source<int, NotUsed> source = Source.From(Enumerable.Range(0, 100)); Flow<int, int, NotUsed> flow = Flow.Create<int>().SelectAsync(1, VeryHeavyProcess); // (2) Sink<int, Task<int>> sink = Sink.Sum<int>((a, b) => a + b); IRunnableGraph<Task<int>> r = source.Via(flow).ToMaterialized(sink, Keep.Right); return r.Run(materializer); }
00:01:40.0708893 00:01:40.2291617 9900 9900
ここまで単純な例だと逆にUseStreams
の方が余計ごちゃっとしているように感じますが、まぁ例なので。
VeryHeavyProcess(int)
は純粋なので並列で動作させても結果は同じです。弊社のPCは物理6コアなので6つ並列で動作させましょう。
(2)を
// 設定ファイル経由でActor1つに占有スレッドを割り当てる // custom-dedicated-dispatcher { // type = PinnedDispatcher // } Flow<int, int, NotUsed> flow = Flow.Create<int>().SelectAsync(6, VeryHeavyProcess) .WithAttributes(ActorAttributes.CreateDispatcher("custom-dedicated-dispatcher")); // (2)
00:01:40.0556817 00:00:17.1568275 9900 9900
に変えるだけで6スレッド*3で動作してくれます。
じゃあ(1)を並列で動作させるにはどうしたらいいでしょう。
PLINQ?気合でTask
を待ち受ける?今のコードを崩す必要が出てきます。
スレッドを駆使して頑張る?それではTPL登場以前に逆戻りです。
少しの変更で各ステージを並列で動作させることが出来ます。そう、Akka.NET Streamsならね*4
他言語由来だと命名その他がアレな事があるけど、その辺はどうなの?
もともとはMap
やFilter
などのScala由来の命名がされていましたが、#1935でLINQ・Rxに近い命名に変更されたのでLINQが分かればIntellSense™に表示される説明を読めば何となく使い方がわかるようになっています。
「モ」から始まる名前を言ってはいけない概念は出てきませんので安心して使えます。
また、IEnumerable
やTask
などの.NETのエコシステムに乗っかれるものはちゃんと乗っかっているので違和感はほとんど感じないでしょう。
.NET Framework? .NET Core?
.NET Framework 4.5もしくは.NET Standard 1.6が対応プラットフォームの為、どちらでも使用可能です。
その為どちらでも好きな方を選べばいいとおもいますが、バッチ処理等がメインの適用先となりGUIが要らない事がほとんどだと思うので.NET Coreで書いたほうがいろんなOSで動くと思うのでお得感があります。
使ってみたいんだけど、どこから始めればいいの?
とりあえず.NET Coreのコンソールアプリケーションをテンプレートから生成します。
> dotnet new console
そのあとはnuget経由でAkka.NET Streamsをインストールします。
> dotnet add package Akka.Streams --version 1.3.8
Akka.NET Streamsでよく使う要素を1通り使ってみました。
using System; using System.Linq; using System.Threading.Tasks; using Akka; using Akka.Actor; using Akka.Streams; using Akka.Streams.Dsl; namespace Akkanet.Streams.Template { class Program { static void Main(string[] args) { var result = Stream(); result.Wait(); Console.WriteLine(result.Result); } static Task<int> Stream() { // 下流に各要素を放出する Source Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); // 上流から流れてきた要素を足し合わせる Sink Sink<int, Task<int>> sink = Sink.Aggregate<int, int>(0, (l, r) => l + r); // Stream を正常(もしくはキャンセル扱いで)に停止させるための KillSwitch Flow<int, int, UniqueKillSwitch> killSwitch = Flow.Create<int>().ViaMaterialized(KillSwitches.Single<int>(), Keep.Right); // Stream(の特定の部分)を通過する要素の流量を制御するための Throttle Flow<int, int, NotUsed> throttle = Flow.Create<int>().Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping); // Stream を動作させる Actor をホストする ActorSystem ActorSystem system = ActorSystem.Create("akkanet"); // ActorSystem を使用して Stream をマテリアル化するマテリアライザ ActorMaterializer materializer = ActorMaterializer.Create(system); // Source、Sink、Throttle、KillSwitch を使用して RunnableGraph(実行可能なグラフ)を組み立てる IRunnableGraph<Tuple<UniqueKillSwitch, Task<int>>> runnable = source .Via(throttle) .ViaMaterialized(killSwitch, Keep.Right) .ToMaterialized(sink, Keep.Both); // RunnableGraph をマテリアライズして Stream を作動させる var (ks, mat) = runnable.Run(materializer); // 10秒後に KillSwitch を使用して Stream を途中で停止させる(完了扱い) ICancelable canceller = materializer.ScheduleOnce(TimeSpan.FromSeconds(10), () => { Console.WriteLine("Stream is cancelled"); ks.Shutdown(); }); // Stream 完了後に ActorSystem と ActorMaterializer を破棄する return mat.ContinueWith(prev => { canceller.Cancel(); materializer.Dispose(); system.Dispose(); return prev.Result; }); } } }
1から100までの値を下流に放流するSource
、上流から流れてきた値を加算するSink
、その地点の流量を制御するための組み込みのFlow
であるThrottle
、Streamを終了させるためのKillSwitch
から構成されています。
1から100まで放流するのに約100秒かかりますが、約10秒後にKillSwitchが動作して下流への流れを阻害しますので結果としてStreamは10秒後に停止します。タイミングに依存しますが、結果は大体55位になるはずです。
おわりに
日本語資料が絶望的なのでいきなり英語の資料を漁るか、いきなりソースコードリーディングをしないといけない部分があり初期のつらみポイントは割と高めですが、うまく適用範囲に嵌ればStreamsの名を冠する通り、ストリーム系の処理では無類の強さを発揮します。
もちろんVBでも使用することが出来るのでVBerもにっこりです。