Spark ShellをScalaで使うときのメモ
はじめに
最近データの整形等にSpark Shellを使っているのですが、使い方を忘れるので備忘録的なアレです。
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_222)
バージョンはこんな感じです。
データソース読み込み
CSV(エンコーディング指定・ヘッダからカラム名を推定)
val prods = spark.read.format("csv"). option("header", true). option("encoding", "shift_jis"). load("Prod.csv")
ヘッダからカラム名を推定してくれるのでスキーマを用意しなくてもOKです。
ただ、当然ながら全部のカラムがString
扱いになります。
また、エンコーディングをShift_JISなど指定できるので、Excelが正義 オブ 正義なCSV界隈でも生きていけます。
CSV(エンコーディング指定・スキーマを指定)
import org.apache.spark.sql.types._ val prodSchema= StructType(Seq( StructField("商品コード", IntegerType, false), StructField("商品名", StringType, false), StructField("価格", LongType, false))) val prods = spark.read.format("csv"). option("header", true). option("encoding", "shift_jis"). schema(prodSchema).load("Prod.csv")
データベースから読み込み(Postgres)
データベースから読み込むときは、JDBCドライバへのパスを指定してSpark Shellを起動する必要があります。
spark-shell --driver-class-path .\postgresql-42.2.8.jar jars .\postgresql-42.2.8.jar
val sales = spark. read.format("jdbc"). option("url", "jdbc:postgresql://localhost:5432/testdb"). option("user", "jyuch"). option("password", "*****"). option("dbtable", "public.sales"). load
クエリの結果をロードすることも出来ます。
val sales = spark. read.format("jdbc"). option("url", "jdbc:postgresql://localhost:5432/testdb"). option("user", "jyuch"). option("password", "*****"). option("query", "select * from Sales where num > 5"). load
フィルタリング
prods.filter($"商品名" === "テレビ") // 完全一致 prods.filter($"商品名" equalTo "テレビ") // 完全一致その2 prods.filter($"商品名" startsWith "テ") // 前方一致 prods.filter($"商品名" endsWith "ビ") // 後方一致 prods.filter($"商品名" contains "レ") // 含む prods.filter($"価格" >= "50000") // 以上 prods.filter($"価格" > "50000") // より大きい prods.filter($"価格" <= "50000") // 以下 prods.filter($"価格" < "50000") // より小さい prods.filter($"価格" > 10000 and $"価格" < 100000) // and とか or も使える
フィルタリングはfilter
もしくはwhere
を使えます。SQLの語彙を借りるならwhere
でしょうけど、正直どっちでも良いと思います。
結合
sales.join(prods, $"商品コード" === $"product_id", "inner")
見た感じで大体分かりますが、結合するDataFrame
、結合条件
、結合タイプ
を指定します。
また、結合タイプにはinner
、outer
、full
、fullouter
、full_outer
、leftouter
、left
、left_outer
、rightouter
、right
、right_outer
、leftsemi
、left_semi
、leftanti
、left_anti
、cross
が指定できます。
カラムの計算
カラム間の計算を行うにはwithColumn
を使います。
sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num")
定数カラムを追加するにはlit
を使います。
sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("hoge", lit("fuga"))
CSV出力
CSVに出力するときはcoalece(1)
を指定してパーテーションを1つに集約しないとバラバラなCSVが出力されます。
sales.join(prods, $"商品コード" === $"product_id","inner").withColumn("TotalPrice", $"価格" * $"num"). coalesce(1).write. mode("append"). option("header", "true"). csv("output")