C#でもノンブロッキングでTCPしたい

はじめに

特に需要があったわけではないのですが、C#からTCPをノンブロッキングで扱う方法を調べてみました。

普通に『C# TCP ノンブロッキング』でググるこのドキュメントが出てきますが、コールバック祭りはつらみポイントがかなり高いので我らがAkka.NET Streamsで処理する方法を確認してみました。

というよりエコーするだけでこの分量とかヤバすぎでしょ。

Streams

今回のコードは改行で区切られた数字を受け取り、加算したのち加算した値を返すコードを書いてみます。

また、時間がかかる処理のエミュレートとしてTask.Delayを挟みます。

using Akka.Actor;
using Akka.IO;
using Akka.Streams;
using Akka.Streams.Dsl;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Tcp = Akka.Streams.Dsl.Tcp;

namespace AkkaStreamTcp
{
    class Program
    {
        private static int i = 0;

        static void Main(string[] args)
        {
            var system = ActorSystem.Create("akka-stream-tcp");
            var materializer = system.Materializer();

            Source<Tcp.IncomingConnection, Task<Tcp.ServerBinding>> connections =
                system.TcpStream().Bind("localhost", 8888);

            connections.RunForeach(connection =>
            {
                Console.WriteLine($"New connection from: {connection.RemoteAddress}");

                var echo = Flow.Create<ByteString>()
                    .Via(Framing.Delimiter(
                        ByteString.FromString("\n"),
                        maximumFrameLength: 256,
                        allowTruncation: true))
                    .Select(c => c.ToString())
                    .SelectAsync(1, Sum)
                    .Select(ByteString.FromString);

                connection.HandleWith(echo, materializer);
            }, materializer);

            Console.ReadLine();
        }

        static async Task<string> Sum(string value)
        {
            if (int.TryParse(value, out int n))
            {
                Interlocked.Add(ref i, n);
            }

            await Task.Delay(1000);
            return $"{i,-11}";
        }
    }
}

クライアント側は多数のクライアントのエミュレートとしてBroadcastPoolを使用してActorを100倍界王拳として同時接続を行います。

using Akka.Actor;
using Akka.Routing;
using System;
using System.Diagnostics;
using System.Net.Sockets;
using System.Text;

namespace AkkaStreamTcpClient
{
    class Program
    {
        static void Main(string[] args)
        {
            var system = ActorSystem.Create("client");
            var props = Props.Create<ClientActor>().WithRouter(new BroadcastPool(100)).WithDispatcher("default-fork-join-dispatcher");
            var actor = system.ActorOf(props, "client");

            for (var i = 0; i < 100; i++)
            {
                actor.Tell(10);
            }

            Console.ReadLine();
        }
    }

    public class ClientActor : ReceiveActor
    {
        private Guid id;

        public ClientActor()
        {
            id = Guid.NewGuid();

            ReceiveAsync<int>(async i =>
            {
                var sw = Stopwatch.StartNew();

                using (var client = new TcpClient("localhost", 8888))
                using (var stream = client.GetStream())
                {
                    var message = Encoding.ASCII.GetBytes($"{i,-11}\n");
                    await stream.WriteAsync(message, 0, message.Length);
                    await stream.FlushAsync();
                    var response = new byte[11];
                    await stream.ReadAsync(response, 0, 11);
                    var n = Encoding.ASCII.GetString(response);
                    Console.WriteLine($"{id} {n} {sw.ElapsedMilliseconds}");
                }
            });
        }
    }
}
48b9c0da-b281-40eb-8ed5-1984ef11ab53 100000      1036
26f112cc-15d6-44d5-9502-fae81dcfda25 100000      1038
aa56fb77-6421-49e0-9c3a-b42019100fde 100000      1037
2db39b91-d205-4980-ac57-0e93f0c9c6ec 100000      1022
ed7aea6e-9915-4fed-9b13-a65f679d2da4 100000      1012

処理分の遅延だけで各接続が処理出来ているようです。

ちなみにTask.DelayThread.Sleepに変えると

bade5a2a-6f14-4d31-ba42-02d7dca6fa47 100000      1793
a777e9d3-6bac-4f2c-b947-474abd2a397f 100000      1924
20ae8e25-3a71-4567-b893-0deb50fb5a3d 100000      1951
0d29f288-22e9-4567-bc03-fa16bd220dfd 100000      1802
05ca3475-f7c2-403a-a42f-0378c9c9ebd4 100000      1782

と各接続の処理時間が伸びています。まぁそうだよねスレッドをブロックしたらまずいよね

おわりに

Akka.NETつよい

github.com