Akka StreamsでもMyBatisを使いたい

はじめに

正直かなりニッチな需要だと思うのですが、Akka StreamsのMyBatis用コネクタを実装したのでそのお話です。

github.com

JavaでAkka Streamsを使ったときにデータベースへのアクセスを楽にしたかったのが動機ですが、JavaからAkka Streamsを使うこと自体がまぁ、その、ねぇ

インストール

Maven Centralにアーティファクトが登録されていますので、お好きなビルドツールで指定するだけで依存関係が降ってきます。

<dependency>
  <groupId>dev.jyuch</groupId>
  <artifactId>alpakka-mybatis_2.12</artifactId>
  <version>0.1.0</version>
</dependency>
implementation 'dev.jyuch:alpakka-mybatis_2.12:0.1.0'
libraryDependencies += "dev.jyuch" %% "alpakka-mybatis" % "0.1.0"

つかいかた

public class User {
    private int id;
    private String name;

    public User(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public String getName() {
        return name;
    }
}

public interface UserMapper {
    int initialize();
    Cursor<User> select();
    User selectById(int id);
    int insert(User user);
}

が定義されており、MyBatisが適切に構成されているものとします。

Source

public static void source(ActorSystem system, SqlSessionFactory sqlSessionFactory) throws Exception {
    Source<User, CompletionStage<IOResult>> source = MyBatis.source(
            sqlSessionFactory::openSession,
            sqlSession -> sqlSession.getMapper(UserMapper.class).select()
    );

    /*
     * User{id=1, name='alice'}
     * User{id=2, name='bob'}
     */
    CompletionStage<Done> onComplete = source.runForeach(System.out::println, system);
    onComplete.toCompletableFuture().get();
}

Sourceはカーソルからひたすらストリームに流し込む設計にしています。 カーソルが一度にフェッチするサイズはMyBatis側で制御してください。

また、セッションはストリーム終了後にクローズする設計としています。

Flow

public static void flow(ActorSystem system, SqlSessionFactory sqlSessionFactory) throws Exception {
    Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2));
    Flow<Integer, String, CompletionStage<IOResult>> flow = MyBatis.flow(
            sqlSessionFactory::openSession,
            (session, i) -> {
                UserMapper mapper = session.getMapper(UserMapper.class);
                User user = mapper.selectById(i);
                return user.getName();
            },
            false
    );

    /*
     * alice
     * bob
     */
    CompletionStage<Done> onComplete = source.via(flow).runForeach(System.out::println, system);
    onComplete.toCompletableFuture().get();
}

上流から流れてきたアイテムに対してセッションを用いて処理を行うようになっています。

毎回マッパーを生成するのはどうかなと思いましたが、まぁその辺はgroupedを使ってまとめて処理するようにすればいい感じに性能が出てくれると思っています。

Sink

public static void sink(ActorSystem system, SqlSessionFactory sqlSessionFactory) throws Exception {
    Source<User, NotUsed> source = Source.from(
            Arrays.asList(
                    new User(3, "Carol"),
                    new User(3, "Dave")));
    Sink<User, CompletionStage<IOResult>> sink = MyBatis.sink(
            sqlSessionFactory::openSession,
            (session, it) -> {
                UserMapper mapper = session.getMapper(UserMapper.class);
                mapper.insert(it);
            },
            true
    );
    CompletionStage<IOResult> onComplete = source.toMat(sink, Keep.right()).run(system);
    onComplete.toCompletableFuture().get();

    SqlSession session = sqlSessionFactory.openSession();
    UserMapper mapper = session.getMapper(UserMapper.class);

    /*
     * User{id=1, name='alice'}
     * User{id=2, name='bob'}
     * User{id=3, name='Carol'}
     * User{id=3, name='Dave'}
     */
    for (User it : mapper.select()) {
        System.out.println(it);
    }
}

こちらも基本的な考え方はFlowと同じです。

github.com

おわり