僕らが本当に欲しかったのはStream APIだったのか?

TL;DR

  • 要素シーケンスとしてjava.lang.Iterable<T>がすでにあったのに、さらにjava.util.stream.Stream<T>を導入してまでシーケンスに対しての順次及び並列処理を等価的に扱えるようにする*1必要があったの?
    • 実務レベルで考えても順次に処理するのがほとんどで、並列で処理することなんて無くない?
      • 実務でStream APIを並列でギュイィィンしてる例ってどのくらいあるの?
  • 理想を言えば順次と並列でコードが変わらないのがいいけど、あくまで理想じゃない?
    • 今まで順次で動いていたコードに.parallelStream()を差し込んだ時に事故らない保証をするのはかなり大変じゃない?
  • 並列に動かせるとして、どの処理がどのスレッドに対してどのように割り当てられるかを説明出来る人は少なそう
    • 並列目的ならAkka Streamsとか導入したほうが良くない?

結論 ⇒ あくまでシーケンスに対する順次な集合操作を提供するに留めておいたほうが実際のニーズに合っていたのでは?

はじめに

最近バックトゥザJavaしているのですが、JavaのStream APIを調べれば調べるほどコレジャナイ感を感じていたのでそれについてです。

ちなみにここでの引用はJavadocjava.util.streamから引用しています。とりあえず公式ドキュメントを読む。ハンムラビ法典にも書いてあります。

docs.oracle.com

Stream APILinq to Object

弊社的にはStream APIへは.NETのLinqのようなコレクションへの集合操作を提供するAPIを期待していました。

ネット上の紹介記事でもそれを期待する記事が多く、並列処理はどちらかというと『まぁこういうことも出来るよね』ぐらいのニュアンスの記事が多かった印象です。

ところがぎっちょん同じStream<T>インターフェースで並列(マルチスレッド)で動作されうるため、あらゆる箇所でスレッドセーフ性を要求してくるという割とシャレにならない難易度なAPIなんじゃね?っていう感じです。

いや、分かってる人だったらそんな地雷は踏まないという感じなんでしょうけど、玉石混交なプロジェクトではスレッドセーフデストロイヤーなんてごまんと居ますし仮にマルチスレッドに係るバグを踏まれた場合は一瞬で不夜城と化すのでクソオブクソ

順次

for (int i = 0; i < 100; i++) {
    source.add(String.format("item%d", i));
}

Stream<String> stream = source
        .stream()
        .map(it -> {
            System.out.println("map: " + Thread.currentThread().getName() + " " + it);
            return it;
        });

stream.forEach(it -> {
    System.out.println("forEach: " + Thread.currentThread().getName() + " " + it);
});

この場合、出力結果はとても単純です。

map: main item0
forEach: main item0
map: main item1
forEach: main item1
...

JDKのストリーム実装は、並列性が明示的に要求されない限り、順次ストリームを作成します。

とあるように明示的に.parallelStream()を呼ばない限り順次ストリームを作成します。わかりやすいですね

並列

では、.stream().parallelStream()に変更するとどの様に動作するでしょうか。

          +--> map() --+
A:        |            |
1, 2, 3 --+--> map() --+----> forEach()
          |            |
          +--> map() --+


          +--> map() --+  +-> forEach()
B:        |            |  |
1, 2, 3 --+--> map() --+--+-> forEach()
          |            |  |
          +--> map() --+  +-> forEach()


C:
1 -----------> map() -------> forEach()

2 -----------> map() -------> forEach()

3 -----------> map() -------> forEach()

Aで動いてほしいですが、正解はCです。まじかよ

と言うわけでforEach(Consumer<? super T>)は本質的にスレッドセーフではありません。

void forEach(Consumer<? super T> action)

このストリームの各要素に対してアクションを実行します。

これは終端操作です。

この操作の動作は明らかに非決定論的です。並列ストリーム・パイプラインの場合、この操作は、ストリームの検出順序を考慮することを保証しません。保証すると並列性のメリットが犠牲になるからです。与えられた任意の要素に対し、ライブラリが選択した任意のタイミングで任意のスレッド内でアクションが実行される可能性があります。アクションが共有状態にアクセスする場合、必要な同期を提供する責任はアクションにあります。

上の例を走らせてみても、動作する順序及び起動されるスレッドはバラバラになります。

map: main item72
forEach: main item72
map: ForkJoinPool.commonPool-worker-4 item82
forEach: ForkJoinPool.commonPool-worker-6 item7
map: ForkJoinPool.commonPool-worker-15 item44
...

.forEachOrdered()で動かせば順番かつ同じスレッドで動作してくれますが、それでもメインスレッド以外のスレッドで動作する可能性があることには変わりません。 また、中間操作は相変わらずバラバラなスレッドで動作します。

...
map: ForkJoinPool.commonPool-worker-15 item99
map: ForkJoinPool.commonPool-worker-10 item75
forEach: ForkJoinPool.commonPool-worker-3 item1
forEach: ForkJoinPool.commonPool-worker-3 item2
...
forEach: ForkJoinPool.commonPool-worker-3 item98
forEach: ForkJoinPool.commonPool-worker-3 item99

さらに厄介なのが、.stream().parallelStream()も返すのは同じStream<T>なんですよね。型レベルでそいつの動作モードが順序か並列かが分からない。

static void someProcess(Stream<String> stream) {
    // 僕のあずかり知らないところで勝手にマルチスレッドで動作するのがキニクワナイ
    stream.filter(it -> it.length() > 10).forEach(System.out::println);
}

僕らが本当に欲しかったもの

これは弊社の考えなので主語を大きくするのはアレなのですが、僕らが本当に欲しかったものはコレクション(シーケンス)を宣言的に操作できるライブラリであってもっと具体的に言ってしまうとJava版のLinq to Objectだったんじゃないかなと思うんです。

// 弊社の妄想で書いたこんな感じのサムシングが欲しかったの図
// 文字列のシーケンスから10文字以上でかつAから始まるものを文字数毎に最大10個ずつ表示させるコード
Iterable<String> source = new ArrayList<>(/* some initialize */);

Iterable<KeyValuePair<Integer, Iterable<String>>> grouped = source
        .filter(it -> it.length() >= 10)
        .filter(it -> it.startsWith("A"))
        .map(it -> Tuple.Create<Integer, String>(it.length(), it))
        .groupBy(it -> it._1, it -> it._2)

for(KeyValuePair<Integer, Iterable<String>>g: grouped) {
    System.out.println(g.key);

    for(String it: g.value.take(10)) {
        System.out.println(it);
    }
}

んで、ストリームをパラレルでギュイィィンさせたかったらAkka Streamとか導入すればいいのでは?という感じです。

doc.akka.io

いろんなサードパーティがStream APIの実装を提供してくれて、データベースからKafkaのようなメッセージキューから統一的なAPIかつマルチスレッドでギュイィィン出来るのが理想だったのでしょうけど、実際はデータベースへの接続を提供する実装が1つあるくらいな感じです。*2

そんだったらフツーにAkka Streams + Alpakkaの方が便利じゃんになるわけです。

おわりに

簡単に並列化できるなんて大概幻想なので、みんな闇落ちしてAkka Streamsに手を染めましょう!!!!11111

おわり

*1:Streamが順次と並列を扱えることからこの意図を感じた。どっかに書いてあったわけではない。

*2:どちらかと言うとORM的な側面が強そうだから実は違う?