Akka.NETでもStreamsでUDPを使いたい
はじめに
Nignxから吐き出されるSyslogを受け取ってデータベースにぶち込みたいけどその為だけにFluentdをインストールもしたくないしそもそもWindowsでSyslogを扱うのは色々つらみポイントが高いんだよねってことで、Akka.NET Streamsで流し込みたいけど標準で用意されているSourceにUDPないじゃんってことで、とりあえずUDPをSourceで扱う方法を確認してみました。
ちなみに元ネタは↓です。
Reactive Streamsの規格に準拠するのは割と難しいのでActorPublisher<T>
を使うのはやめんしゃいとのことなので、まぁ、この記事は参考にしないほうがいいんじゃないかな(ポロローン
UDP
using Akka; using Akka.Actor; using Akka.Event; using Akka.IO; using Akka.Streams.Actors; using System; using System.Collections.Generic; using System.Net; using Akka.Streams.Dsl; namespace AkkaStreamsUdp { public class UdpSource : Actor<200b>Publisher<Udp.Received> { public static Props Props(EndPoint listenOn, int maxBufferSize) => Akka.Actor.Props.Create(() => new UdpSource(listenOn, maxBufferSize)); public static Source<Udp.Received, IActorRef> Create(EndPoint listenOn, int maxBufferSize) => Source.ActorPublisher<Udp.Received>(Props(listenOn, maxBufferSize)); private Queue<Udp.Received> datagrams = new Queue<Udp.Received>(); private readonly int maxBufferSize; private readonly ILoggingAdapter log = Logging.GetLogger(Context); public UdpSource(EndPoint listenOn, int maxBufferSize) { if (maxBufferSize <= 0) { throw new ArgumentOutOfRangeException(nameof(maxBufferSize)); } this.maxBufferSize = maxBufferSize; Context.System.Udp().Tell(new Udp.Bind(Self, listenOn)); } protected override bool Receive(object message) { return message .Match() .With<Udp.Bound>(() => Context.Become(Ready(Sender))) .With<Cancel>(() => Context.Stop(Self)) .WasHandled; } private Receive Ready(IActorRef socket) { return (object message) => { return message .Match() .With<Udp.Received>(r => { if (datagrams.Count >= maxBufferSize) { log.Warning($"Datagram buffer size {maxBufferSize} exceeded"); Context.Become(Suspend(socket)); } else if (datagrams.Count == 0 && TotalDemand > 0) { OnNext(r); } else { datagrams.Enqueue(r); Deliver(); } }) .With<Request>(() => Deliver()) .With<Udp.Unbind>(() => socket.Tell(Udp.Unbind.Instance)) .With<Udp.Unbound>(() => OnCompleteThenStop()) .With<Cancel>(() => socket.Tell(Udp.Unbind.Instance)) .WasHandled; }; } private Receive Suspend(IActorRef socket) { return (object message) => { return message .Match() .With<Udp.Received>(() => log.Debug("Dropping UDP datagram while suspended")) .With<Request>(() => { Deliver(); log.Info("Datagram buffer size is ok, resuming"); Context.Become(Ready(socket)); }) .WasHandled; }; } private void Deliver() { while (TotalDemand > 0 && datagrams.Count > 0) { var elem = datagrams.Dequeue(); OnNext(elem); } } } }
んで、こんな感じで使えばokです。
using Akka.Actor; using Akka.Streams; using System; using System.Net; using System.Text; namespace AkkaStreamsUdp { class Program { static void Main(string[] args) { using (var system = ActorSystem.Create("udp-test")) using (var materializer = system.Materializer()) { var source = UdpSource.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6000), 100); source.RunForeach(r => { Console.WriteLine(r.Data.ToString(Encoding.UTF8)); }, materializer); Console.ReadLine(); } } } }
ペイロードをUTF-8としてデコードしてひたすらコンソールに吐き出し続けます。
おわりに
とりあえずUDPを受け取って下流に流すことが出来たので、あとは途中のFlowとSinkをいい感じにごにょごにょすればデータベースに突っ込めそうです。
おわり