Spark ShellをScalaで使うときのメモ

はじめに

最近データの整形等にSpark Shellを使っているのですが、使い方を忘れるので備忘録的なアレです。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)

バージョンはこんな感じです。

データソース読み込み

CSVエンコーディング指定・ヘッダからカラム名を推定)

val prods = spark.read.format("csv").
    option("header", true).
    option("encoding", "shift_jis").
    load("Prod.csv")

ヘッダからカラム名を推定してくれるのでスキーマを用意しなくてもOKです。 ただ、当然ながら全部のカラムがString扱いになります。

また、エンコーディングShift_JISなど指定できるので、Excelが正義 オブ 正義なCSV界隈でも生きていけます。

CSVエンコーディング指定・スキーマを指定)

import org.apache.spark.sql.types._

val prodSchema= StructType(Seq(
    StructField("商品コード", IntegerType, false),
    StructField("商品名", StringType, false),
    StructField("価格", LongType, false)))

val prods = spark.read.format("csv").
    option("header", true).
    option("encoding", "shift_jis").
    schema(prodSchema).load("Prod.csv")

スキーマを指定してCSVを読み込みます。

データベースから読み込み(Postgres)

データベースから読み込むときは、JDBCドライバへのパスを指定してSpark Shellを起動する必要があります。

spark-shell --driver-class-path .\postgresql-42.2.8.jar jars .\postgresql-42.2.8.jar
val sales = spark.
    read.format("jdbc").
    option("url", "jdbc:postgresql://localhost:5432/testdb").
    option("user", "jyuch").
    option("password", "*****").
    option("dbtable", "public.sales").
    load

クエリの結果をロードすることも出来ます。

val sales = spark.
    read.format("jdbc").
    option("url", "jdbc:postgresql://localhost:5432/testdb").
    option("user", "jyuch").
    option("password", "*****").
    option("query", "select * from Sales where num > 5").
    load

フィルタリング

prods.filter($"商品名" === "テレビ") // 完全一致

prods.filter($"商品名" equalTo "テレビ") // 完全一致その2

prods.filter($"商品名" startsWith "テ") // 前方一致

prods.filter($"商品名" endsWith "ビ") // 後方一致

prods.filter($"商品名" contains "レ") // 含む

prods.filter($"価格" >= "50000") // 以上

prods.filter($"価格" > "50000") // より大きい

prods.filter($"価格" <= "50000") // 以下

prods.filter($"価格" < "50000") // より小さい

prods.filter($"価格" > 10000 and $"価格" < 100000) // and とか or も使える

フィルタリングはfilterもしくはwhereを使えます。SQLの語彙を借りるならwhereでしょうけど、正直どっちでも良いと思います。

結合

sales.join(prods, $"商品コード" === $"product_id", "inner")

見た感じで大体分かりますが、結合するDataFrame結合条件結合タイプを指定します。

また、結合タイプにはinnerouterfullfullouterfull_outerleftouterleftleft_outerrightouterrightright_outerleftsemileft_semileftantileft_anticrossが指定できます。

カラムの計算

カラム間の計算を行うにはwithColumnを使います。

sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num")

定数カラムを追加するにはlitを使います。

sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("hoge", lit("fuga"))

CSV出力

CSVに出力するときはcoalece(1)を指定してパーテーションを1つに集約しないとバラバラなCSVが出力されます。

sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num").
    coalesce(1).write.
    mode("append").
    option("header", "true").
    csv("output")