2016-04-06 23 views
2

複数のSpark DataFrames(Scala)を効率的にマージ/結合するにはどうすればよいですか?私は、すべてのテーブルに共通の列である 'Date'を以下のように結合し、その結果として疎な配列を取得します。複数のデータフレームを結合する方法Spark Scala効率的な外部外部結合

Data Set A: 
Date Col A1 Col A2 
----------------------- 
1/1/16 A11  A21 
1/2/16 A12  A22 
1/3/16 A13  A23 
1/4/16 A14  A24 
1/5/16 A15  A25 

Data Set B: 
Date Col B1 Col B2 
----------------------- 
1/1/16 B11  B21 
1/3/16 B13  B23 
1/5/16 B15  B25 

Data Set C: 
Date Col C1 Col C2 
----------------------- 
1/2/16 C12  C22 
1/3/16 C13  C23 
1/4/16 C14  C24 
1/5/16 C15  C25 

Expected Result Set: 
Date Col A1 Col A2 Col B1 Col B2 Col C1 Col C2 
--------------------------------------------------------- 
1/1/16 A11  A21  B11  B12 
1/2/16 A12  A22      C12  C22 
1/3/16 A13  A23  B13  B23  C13  C23 
1/4/16 A14  A24      C14  C24 
1/5/16 A15  A25  B15  B25  C15  C25 

これは複数のテーブルで完全外部結合のように感じますが、わかりません。 DataFramesのJoinメソッドを使用しないで、この疎な配列を取得するには、より簡単で効率的な方法がありますか?

答えて

2

これは古い投稿ですので、OPがまだチューニングされているかわかりません。とにかく、希望の結果を得る簡単な方法はcogroup()です。それぞれのRDDを日付がキーである[K,V] RDDに変換してから、cogroupを使用します。ここに例があります:

def mergeFrames(sc: SparkContext, sqlContext: SQLContext) = { 

import sqlContext.implicits._ 

// Create three dataframes. All string types assumed. 
val dfa = sc.parallelize(Seq(A("1/1/16", "A11", "A21"), 
    A("1/2/16", "A12", "A22"), 
    A("1/3/16", "A13", "A23"), 
    A("1/4/16", "A14", "A24"), 
    A("1/5/16", "A15", "A25"))).toDF() 

val dfb = sc.parallelize(Seq(
    B("1/1/16", "B11", "B21"), 
    B("1/3/16", "B13", "B23"), 
    B("1/5/16", "B15", "B25"))).toDF() 

val dfc = sc.parallelize(Seq(
    C("1/2/16", "C12", "C22"), 
    C("1/3/16", "C13", "C23"), 
    C("1/4/16", "C14", "C24"), 
    C("1/5/16", "C15", "C25"))).toDF() 

val rdda = dfa.rdd.map(row => row(0) -> row.toSeq.drop(1)) 
val rddb = dfb.rdd.map(row => row(0) -> row.toSeq.drop(1)) 
val rddc = dfc.rdd.map(row => row(0) -> row.toSeq.drop(1)) 

val schema = StructType("date a1 a2 b1 b2 c1 c2".split(" ").map(fieldName => StructField(fieldName, StringType))) 

// Form cogroups. `date` is assumed to be a key so there's at most one row for each date in an rdd/df 
val cg: RDD[Row] = rdda.cogroup(rddb, rddc).map { case (k, (v1, v2, v3)) => 
    val cols = Seq(k) ++ 
    (if (v1.nonEmpty) v1.head else Seq(null, null)) ++ 
    (if (v2.nonEmpty) v2.head else Seq(null, null)) ++ 
    (if (v3.nonEmpty) v3.head else Seq(null, null)) 
    Row.fromSeq(cols) 
} 

// Turn RDD back to DataFrame 
val cgdf = sqlContext.createDataFrame(cg, schema).sort("date") 

cgdf.show } 
+0

これはどのように動作するかを詳しく説明できますか? – banncee

+0

私は自分の答えを編集し、いくつかのサンプルコードを追加しました。希望が役立ちます。 –

関連する問題