C#でもノンブロッキングでTCPしたい
はじめに
特に需要があったわけではないのですが、C#からTCPをノンブロッキングで扱う方法を調べてみました。
普通に『C# TCP ノンブロッキング』でググるとこのドキュメントが出てきますが、コールバック祭りはつらみポイントがかなり高いので我らがAkka.NET Streamsで処理する方法を確認してみました。
というよりエコーするだけでこの分量とかヤバすぎでしょ。
Streams
今回のコードは改行で区切られた数字を受け取り、加算したのち加算した値を返すコードを書いてみます。
また、時間がかかる処理のエミュレートとしてTask.Delay
を挟みます。
using Akka.Actor; using Akka.IO; using Akka.Streams; using Akka.Streams.Dsl; using System; using System.Linq; using System.Threading; using System.Threading.Tasks; using Tcp = Akka.Streams.Dsl.Tcp; namespace AkkaStreamTcp { class Program { private static int i = 0; static void Main(string[] args) { var system = ActorSystem.Create("akka-stream-tcp"); var materializer = system.Materializer(); Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>> connections = system.TcpStream().Bind("localhost", 8888); connections.RunForeach(connection => { Console.WriteLine($"New connection from: {connection.RemoteAddress}"); var echo = Flow.Create<ByteString>() .Via(Framing.Delimiter( ByteString.FromString("\n"), maximumFrameLength: 256, allowTruncation: true)) .Select(c => c.ToString()) .SelectAsync(1, Sum) .Select(ByteString.FromString); connection.HandleWith(echo, materializer); }, materializer); Console.ReadLine(); } static async Task<string> Sum(string value) { if (int.TryParse(value, out int n)) { Interlocked.Add(ref i, n); } await Task.Delay(1000); return $"{i,-11}"; } } }
クライアント側は多数のクライアントのエミュレートとしてBroadcastPool
を使用してActorを100倍界王拳として同時接続を行います。
using Akka.Actor; using Akka.Routing; using System; using System.Diagnostics; using System.Net.Sockets; using System.Text; namespace AkkaStreamTcpClient { class Program { static void Main(string[] args) { var system = ActorSystem.Create("client"); var props = Props.Create<ClientActor>().WithRouter(new BroadcastPool(100)).WithDispatcher("default-fork-join-dispatcher"); var actor = system.ActorOf(props, "client"); for (var i = 0; i < 100; i++) { actor.Tell(10); } Console.ReadLine(); } } public class ClientActor : ReceiveActor { private Guid id; public ClientActor() { id = Guid.NewGuid(); ReceiveAsync<int>(async i => { var sw = Stopwatch.StartNew(); using (var client = new TcpClient("localhost", 8888)) using (var stream = client.GetStream()) { var message = Encoding.ASCII.GetBytes($"{i,-11}\n"); await stream.WriteAsync(message, 0, message.Length); await stream.FlushAsync(); var response = new byte[11]; await stream.ReadAsync(response, 0, 11); var n = Encoding.ASCII.GetString(response); Console.WriteLine($"{id} {n} {sw.ElapsedMilliseconds}"); } }); } } }
48b9c0da-b281-40eb-8ed5-1984ef11ab53 100000 1036 26f112cc-15d6-44d5-9502-fae81dcfda25 100000 1038 aa56fb77-6421-49e0-9c3a-b42019100fde 100000 1037 2db39b91-d205-4980-ac57-0e93f0c9c6ec 100000 1022 ed7aea6e-9915-4fed-9b13-a65f679d2da4 100000 1012
処理分の遅延だけで各接続が処理出来ているようです。
ちなみにTask.Delay
をThread.Sleep
に変えると
bade5a2a-6f14-4d31-ba42-02d7dca6fa47 100000 1793 a777e9d3-6bac-4f2c-b947-474abd2a397f 100000 1924 20ae8e25-3a71-4567-b893-0deb50fb5a3d 100000 1951 0d29f288-22e9-4567-bc03-fa16bd220dfd 100000 1802 05ca3475-f7c2-403a-a42f-0378c9c9ebd4 100000 1782
と各接続の処理時間が伸びています。まぁそうだよねスレッドをブロックしたらまずいよね
おわりに
Akka.NETつよい