Spark ShellをScalaで使うときのメモ
はじめに
最近データの整形等にSpark Shellを使っているのですが、使い方を忘れるので備忘録的なアレです。
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
バージョンはこんな感じです。
データソース読み込み
CSV(エンコーディング指定・ヘッダからカラム名を推定)
val prods = spark.read.format("csv"). option("header", true). option("encoding", "shift_jis"). load("Prod.csv")
ヘッダからカラム名を推定してくれるのでスキーマを用意しなくてもOKです。
ただ、当然ながら全部のカラムがString扱いになります。
また、エンコーディングをShift_JISなど指定できるので、Excelが正義 オブ 正義なCSV界隈でも生きていけます。
CSV(エンコーディング指定・スキーマを指定)
import org.apache.spark.sql.types._ val prodSchema= StructType(Seq( StructField("商品コード", IntegerType, false), StructField("商品名", StringType, false), StructField("価格", LongType, false))) val prods = spark.read.format("csv"). option("header", true). option("encoding", "shift_jis"). schema(prodSchema).load("Prod.csv")
データベースから読み込み(Postgres)
データベースから読み込むときは、JDBCドライバへのパスを指定してSpark Shellを起動する必要があります。
spark-shell --driver-class-path .\postgresql-42.2.8.jar jars .\postgresql-42.2.8.jar
val sales = spark. read.format("jdbc"). option("url", "jdbc:postgresql://localhost:5432/testdb"). option("user", "jyuch"). option("password", "*****"). option("dbtable", "public.sales"). load
クエリの結果をロードすることも出来ます。
val sales = spark. read.format("jdbc"). option("url", "jdbc:postgresql://localhost:5432/testdb"). option("user", "jyuch"). option("password", "*****"). option("query", "select * from Sales where num > 5"). load
フィルタリング
prods.filter($"商品名" === "テレビ") // 完全一致 prods.filter($"商品名" equalTo "テレビ") // 完全一致その2 prods.filter($"商品名" startsWith "テ") // 前方一致 prods.filter($"商品名" endsWith "ビ") // 後方一致 prods.filter($"商品名" contains "レ") // 含む prods.filter($"価格" >= "50000") // 以上 prods.filter($"価格" > "50000") // より大きい prods.filter($"価格" <= "50000") // 以下 prods.filter($"価格" < "50000") // より小さい prods.filter($"価格" > 10000 and $"価格" < 100000) // and とか or も使える
フィルタリングはfilterもしくはwhereを使えます。SQLの語彙を借りるならwhereでしょうけど、正直どっちでも良いと思います。
結合
sales.join(prods, $"商品コード" === $"product_id", "inner")
見た感じで大体分かりますが、結合するDataFrame、結合条件、結合タイプを指定します。
また、結合タイプにはinner、outer、full、fullouter、full_outer、leftouter、left、left_outer、rightouter、right、right_outer、leftsemi、left_semi、leftanti、left_anti、crossが指定できます。
カラムの計算
カラム間の計算を行うにはwithColumnを使います。
sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num")
定数カラムを追加するにはlitを使います。
sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("hoge", lit("fuga"))
CSV出力
CSVに出力するときはcoalece(1)を指定してパーテーションを1つに集約しないとバラバラなCSVが出力されます。
sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num"). coalesce(1).write. mode("append"). option("header", "true"). csv("output")