2017-02-02 23 views
0

Spark-scalaを使用して現在のSql QuerysをDataFramesに変換しています。複数の内部結合を実行するクエリがありました。実際にSqlContext.sql( "")私のチームは、私たちは再帰的に複数の条件を使用して複数のデータフレームを結合するSpark Scala

List(df1,df2,df3,dfN).reduce((a, b) => a.join(b, joinCondition)) 

を使用して参加することができます知っているようになった。しかし、私はそこにいるための条件の上に満足カント私の検索からのデータフレームの先頭

si s inner join 
ac a on s.cid = a.cid and s.sid =a.sid 
inner join De d on s.cid = d.cid AND d.aid = a.aid 
inner join SGrM sgm on s.cid = sgm.cid and s.sid =sgm.sid and sgm.status=1 
inner join SiGo sg on sgm.cid =sg.cid and sgm.gid =sg.gid 
inner join bg bu on s.cid = bu.cid and s.sid =bu.sid 
inner join ls al on a.AtLId = al.lid 
inner join ls rl on a.RtLId = rl.lid 
inner join ls vl on a.VLId = vl.lid 

の操作を実行したいsqlContextに興味がありません複数の条件が含まれていますこれを実行するにはどうすればよいですか?

答えて

0

まず、DataFramesをDataSetとSpark 2. +に置き換え、JVMオブジェクトを避けてパフォーマンスを向上させます。タングステンを再プロジェクトします。

今、あなたの質問に

:あなたは4×DSを持っているとしましょうとして:

まず、あなたのテーブルのスキーマを作成します。

val ds1 = spark.read.parquet("X1").as[DS] 

val ds2 = spark.read.parquet("X2").as[DS] 

val ds3 = spark.read.parquet("X3").as[DS] 

val ds4 = spark.read.parquet("X4").as[DS] 

case class DS (id: Int, colA: String)

次に有効に最適化してファイルを読み込みます

これで、データフローに従うことができるように1つずつ結合することができます(テーブルが小さい場合にのみブロードキャストを使用します):

case class JoinedDS (colB: String) 


val joinedDS = ds1.join(broadcast(ds2), Seq("id"), "inner") 
.join(ds3, Seq("id", "colB"), "inner") 
.join(ds4, Seq("id"), "inner") 
.select(col("colB") 
.as[JoinedDS] 
+0

感謝を/ DataFramesオペレーション – Anji

+0

DataSetオブジェクトを常に渡すようにすると、パフォーマンスが向上し、各オブジェクトがDataframeに変換されていない場合に作成されたときに各オブジェクトを監視し、明示的に列名の型ex select(col( "as")。を[String]とします)、あなたが 'sql'と比較するとsparkが最適化をしてくれる限り型指定されていないDataset re dataFrameを使用しているため、最適化は行われません –

0

あなたは以下のような複数の条件で複数のデータフレームを結合することができます。私はSQLを(「」)を使用して、と比較して性能を発揮し、SQLクエリやデータセットを作成します。質問があった

val result = df1.as("df1").join(df2.as("df2"), 
       $"df1.col1"===$df2.col1" && $"df1.col2"===$df2.col2").join(df3.as("df3"), 
       $"df3.col1"===$df2.col1" && $"df3.col2"===$df2.col2", "left_outer") 
関連する問題