Akka StreamsでもMyBatisを使いたい

はじめに

正直かなりニッチな需要だと思うのですが、Akka StreamsのMyBatis用コネクタを実装したのでそのお話です。

github.com

JavaでAkka Streamsを使ったときにデータベースへのアクセスを楽にしたかったのが動機ですが、JavaからAkka Streamsを使うこと自体がまぁ、その、ねぇ

インストール

Maven Centralにアーティファクトが登録されていますので、お好きなビルドツールで指定するだけで依存関係が降ってきます。

<dependency>
  <groupId>dev.jyuch</groupId>
  <artifactId>alpakka-mybatis_2.12</artifactId>
  <version>0.1.0</version>
</dependency>
implementation 'dev.jyuch:alpakka-mybatis_2.12:0.1.0'
libraryDependencies += "dev.jyuch" %% "alpakka-mybatis" % "0.1.0"

つかいかた

public class User {
    private int id;
    private String name;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public String getName() {
        return name;
    }
}

public interface UserMapper {
    int initialize();
    Cursor<User> select();
    User selectById(int id);
    int insert(User user);
}

が定義されており、MyBatisが適切に構成されているものとします。

Source

public static void source(ActorSystem system, SqlSessionFactory sqlSessionFactory) throws Exception {
    Source<User, CompletionStage<IOResult>> source = MyBatis.source(
            sqlSessionFactory::openSession,
            sqlSession -> sqlSession.getMapper(UserMapper.class).select()
    );

    /*
     * User{id=1, name='alice'}
     * User{id=2, name='bob'}
     */
    CompletionStage<Done> onComplete = source.runForeach(System.out::println, system);
    onComplete.toCompletableFuture().get();
}

Sourceはカーソルからひたすらストリームに流し込む設計にしています。 カーソルが一度にフェッチするサイズはMyBatis側で制御してください。

また、セッションはストリーム終了後にクローズする設計としています。

Flow

public static void flow(ActorSystem system, SqlSessionFactory sqlSessionFactory) throws Exception {
    Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2));
    Flow<Integer, String, CompletionStage<IOResult>> flow = MyBatis.flow(
            sqlSessionFactory::openSession,
            (session, i) -> {
                UserMapper mapper = session.getMapper(UserMapper.class);
                User user = mapper.selectById(i);
                return user.getName();
            },
            false
    );

    /*
     * alice
     * bob
     */
    CompletionStage<Done> onComplete = source.via(flow).runForeach(System.out::println, system);
    onComplete.toCompletableFuture().get();
}

上流から流れてきたアイテムに対してセッションを用いて処理を行うようになっています。

毎回マッパーを生成するのはどうかなと思いましたが、まぁその辺はgroupedを使ってまとめて処理するようにすればいい感じに性能が出てくれると思っています。

Sink

public static void sink(ActorSystem system, SqlSessionFactory sqlSessionFactory) throws Exception {
    Source<User, NotUsed> source = Source.from(
            Arrays.asList(
                    new User(3, "Carol"),
                    new User(3, "Dave")));
    Sink<User, CompletionStage<IOResult>> sink = MyBatis.sink(
            sqlSessionFactory::openSession,
            (session, it) -> {
                UserMapper mapper = session.getMapper(UserMapper.class);
                mapper.insert(it);
            },
            true
    );
    CompletionStage<IOResult> onComplete = source.toMat(sink, Keep.right()).run(system);
    onComplete.toCompletableFuture().get();

    SqlSession session = sqlSessionFactory.openSession();
    UserMapper mapper = session.getMapper(UserMapper.class);

    /*
     * User{id=1, name='alice'}
     * User{id=2, name='bob'}
     * User{id=3, name='Carol'}
     * User{id=3, name='Dave'}
     */
    for (User it : mapper.select()) {
        System.out.println(it);
    }
}

こちらも基本的な考え方はFlowと同じです。

github.com

おわり

僕らが本当に欲しかったのはStream APIだったのか?

TL;DR

  • 要素シーケンスとしてjava.lang.Iterable<T>がすでにあったのに、さらにjava.util.stream.Stream<T>を導入してまでシーケンスに対しての順次及び並列処理を等価的に扱えるようにする*1必要があったの?
    • 実務レベルで考えても順次に処理するのがほとんどで、並列で処理することなんて無くない?
      • 実務でStream APIを並列でギュイィィンしてる例ってどのくらいあるの?
  • 理想を言えば順次と並列でコードが変わらないのがいいけど、あくまで理想じゃない?
    • 今まで順次で動いていたコードに.parallelStream()を差し込んだ時に事故らない保証をするのはかなり大変じゃない?
  • 並列に動かせるとして、どの処理がどのスレッドに対してどのように割り当てられるかを説明出来る人は少なそう
    • 並列目的ならAkka Streamsとか導入したほうが良くない?

結論 ⇒ あくまでシーケンスに対する順次な集合操作を提供するに留めておいたほうが実際のニーズに合っていたのでは?

はじめに

最近バックトゥザJavaしているのですが、JavaのStream APIを調べれば調べるほどコレジャナイ感を感じていたのでそれについてです。

ちなみにここでの引用はJavadocjava.util.streamから引用しています。とりあえず公式ドキュメントを読む。ハンムラビ法典にも書いてあります。

docs.oracle.com

Stream APILinq to Object

弊社的にはStream APIへは.NETのLinqのようなコレクションへの集合操作を提供するAPIを期待していました。

ネット上の紹介記事でもそれを期待する記事が多く、並列処理はどちらかというと『まぁこういうことも出来るよね』ぐらいのニュアンスの記事が多かった印象です。

ところがぎっちょん同じStream<T>インターフェースで並列(マルチスレッド)で動作されうるため、あらゆる箇所でスレッドセーフ性を要求してくるという割とシャレにならない難易度なAPIなんじゃね?っていう感じです。

いや、分かってる人だったらそんな地雷は踏まないという感じなんでしょうけど、玉石混交なプロジェクトではスレッドセーフデストロイヤーなんてごまんと居ますし仮にマルチスレッドに係るバグを踏まれた場合は一瞬で不夜城と化すのでクソオブクソ

順次

for (int i = 0; i < 100; i++) {
    source.add(String.format("item%d", i));
}

Stream<String> stream = source
        .stream()
        .map(it -> {
            System.out.println("map: " + Thread.currentThread().getName() + " " + it);
            return it;
        });

stream.forEach(it -> {
    System.out.println("forEach: " + Thread.currentThread().getName() + " " + it);
});

この場合、出力結果はとても単純です。

map: main item0
forEach: main item0
map: main item1
forEach: main item1
...

JDKのストリーム実装は、並列性が明示的に要求されない限り、順次ストリームを作成します。

とあるように明示的に.parallelStream()を呼ばない限り順次ストリームを作成します。わかりやすいですね

並列

では、.stream().parallelStream()に変更するとどの様に動作するでしょうか。

          +--> map() --+
A:        |            |
1, 2, 3 --+--> map() --+----> forEach()
          |            |
          +--> map() --+


          +--> map() --+  +-> forEach()
B:        |            |  |
1, 2, 3 --+--> map() --+--+-> forEach()
          |            |  |
          +--> map() --+  +-> forEach()


C:
1 -----------> map() -------> forEach()

2 -----------> map() -------> forEach()

3 -----------> map() -------> forEach()

Aで動いてほしいですが、正解はCです。まじかよ

と言うわけでforEach(Consumer<? super T>)は本質的にスレッドセーフではありません。

void forEach(Consumer<? super T> action)

このストリームの各要素に対してアクションを実行します。

これは終端操作です。

この操作の動作は明らかに非決定論的です。並列ストリーム・パイプラインの場合、この操作は、ストリームの検出順序を考慮することを保証しません。保証すると並列性のメリットが犠牲になるからです。与えられた任意の要素に対し、ライブラリが選択した任意のタイミングで任意のスレッド内でアクションが実行される可能性があります。アクションが共有状態にアクセスする場合、必要な同期を提供する責任はアクションにあります。

上の例を走らせてみても、動作する順序及び起動されるスレッドはバラバラになります。

map: main item72
forEach: main item72
map: ForkJoinPool.commonPool-worker-4 item82
forEach: ForkJoinPool.commonPool-worker-6 item7
map: ForkJoinPool.commonPool-worker-15 item44
...

.forEachOrdered()で動かせば順番かつ同じスレッドで動作してくれますが、それでもメインスレッド以外のスレッドで動作する可能性があることには変わりません。 また、中間操作は相変わらずバラバラなスレッドで動作します。

...
map: ForkJoinPool.commonPool-worker-15 item99
map: ForkJoinPool.commonPool-worker-10 item75
forEach: ForkJoinPool.commonPool-worker-3 item1
forEach: ForkJoinPool.commonPool-worker-3 item2
...
forEach: ForkJoinPool.commonPool-worker-3 item98
forEach: ForkJoinPool.commonPool-worker-3 item99

さらに厄介なのが、.stream().parallelStream()も返すのは同じStream<T>なんですよね。型レベルでそいつの動作モードが順序か並列かが分からない。

static void someProcess(Stream<String> stream) {
    // 僕のあずかり知らないところで勝手にマルチスレッドで動作するのがキニクワナイ
    stream.filter(it -> it.length() > 10).forEach(System.out::println);
}

僕らが本当に欲しかったもの

これは弊社の考えなので主語を大きくするのはアレなのですが、僕らが本当に欲しかったものはコレクション(シーケンス)を宣言的に操作できるライブラリであってもっと具体的に言ってしまうとJava版のLinq to Objectだったんじゃないかなと思うんです。

// 弊社の妄想で書いたこんな感じのサムシングが欲しかったの図
// 文字列のシーケンスから10文字以上でかつAから始まるものを文字数毎に最大10個ずつ表示させるコード
Iterable<String> source = new ArrayList<>(/* some initialize */);

Iterable<KeyValuePair<Integer, Iterable<String>>> grouped = source
        .filter(it -> it.length() >= 10)
        .filter(it -> it.startsWith("A"))
        .map(it -> Tuple.Create<Integer, String>(it.length(), it))
        .groupBy(it -> it._1, it -> it._2)

for(KeyValuePair<Integer, Iterable<String>>g: grouped) {
    System.out.println(g.key);

    for(String it: g.value.take(10)) {
        System.out.println(it);
    }
}

んで、ストリームをパラレルでギュイィィンさせたかったらAkka Streamとか導入すればいいのでは?という感じです。

doc.akka.io

いろんなサードパーティがStream APIの実装を提供してくれて、データベースからKafkaのようなメッセージキューから統一的なAPIかつマルチスレッドでギュイィィン出来るのが理想だったのでしょうけど、実際はデータベースへの接続を提供する実装が1つあるくらいな感じです。*2

そんだったらフツーにAkka Streams + Alpakkaの方が便利じゃんになるわけです。

おわりに

簡単に並列化できるなんて大概幻想なので、みんな闇落ちしてAkka Streamsに手を染めましょう!!!!11111

おわり

*1:Streamが順次と並列を扱えることからこの意図を感じた。どっかに書いてあったわけではない。

*2:どちらかと言うとORM的な側面が強そうだから実は違う?

Spark ShellをScalaで使うときのメモ

はじめに

最近データの整形等にSpark Shellを使っているのですが、使い方を忘れるので備忘録的なアレです。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)

バージョンはこんな感じです。

データソース読み込み

CSVエンコーディング指定・ヘッダからカラム名を推定)

val prods = spark.read.format("csv").
    option("header", true).
    option("encoding", "shift_jis").
    load("Prod.csv")

ヘッダからカラム名を推定してくれるのでスキーマを用意しなくてもOKです。 ただ、当然ながら全部のカラムがString扱いになります。

また、エンコーディングShift_JISなど指定できるので、Excelが正義 オブ 正義なCSV界隈でも生きていけます。

CSVエンコーディング指定・スキーマを指定)

import org.apache.spark.sql.types._

val prodSchema= StructType(Seq(
    StructField("商品コード", IntegerType, false),
    StructField("商品名", StringType, false),
    StructField("価格", LongType, false)))

val prods = spark.read.format("csv").
    option("header", true).
    option("encoding", "shift_jis").
    schema(prodSchema).load("Prod.csv")

スキーマを指定してCSVを読み込みます。

データベースから読み込み(Postgres)

データベースから読み込むときは、JDBCドライバへのパスを指定してSpark Shellを起動する必要があります。

spark-shell --driver-class-path .\postgresql-42.2.8.jar jars .\postgresql-42.2.8.jar
val sales = spark.
    read.format("jdbc").
    option("url", "jdbc:postgresql://localhost:5432/testdb").
    option("user", "jyuch").
    option("password", "*****").
    option("dbtable", "public.sales").
    load

クエリの結果をロードすることも出来ます。

val sales = spark.
    read.format("jdbc").
    option("url", "jdbc:postgresql://localhost:5432/testdb").
    option("user", "jyuch").
    option("password", "*****").
    option("query", "select * from Sales where num > 5").
    load

フィルタリング

prods.filter($"商品名" === "テレビ") // 完全一致

prods.filter($"商品名" equalTo "テレビ") // 完全一致その2

prods.filter($"商品名" startsWith "テ") // 前方一致

prods.filter($"商品名" endsWith "ビ") // 後方一致

prods.filter($"商品名" contains "レ") // 含む

prods.filter($"価格" >= "50000") // 以上

prods.filter($"価格" > "50000") // より大きい

prods.filter($"価格" <= "50000") // 以下

prods.filter($"価格" < "50000") // より小さい

prods.filter($"価格" > 10000 and $"価格" < 100000) // and とか or も使える

フィルタリングはfilterもしくはwhereを使えます。SQLの語彙を借りるならwhereでしょうけど、正直どっちでも良いと思います。

結合

sales.join(prods, $"商品コード" === $"product_id", "inner")

見た感じで大体分かりますが、結合するDataFrame結合条件結合タイプを指定します。

また、結合タイプにはinnerouterfullfullouterfull_outerleftouterleftleft_outerrightouterrightright_outerleftsemileft_semileftantileft_anticrossが指定できます。

カラムの計算

カラム間の計算を行うにはwithColumnを使います。

sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num")

定数カラムを追加するにはlitを使います。

sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("hoge", lit("fuga"))

CSV出力

CSVに出力するときはcoalece(1)を指定してパーテーションを1つに集約しないとバラバラなCSVが出力されます。

sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num").
    coalesce(1).write.
    mode("append").
    option("header", "true").
    csv("output")