2016-04-11 15 views
0

は、私は2つのDataFrameを持っているとdatetimemidbinImbalance分野に応じてそれらを結合し、リストにtimeBmidBに対応する値を収集したいと思います。collect_listの使い方は?

私は次のコードで試してみました:

val d1: DataFrame 
val d3: DataFrame 
val d2 = d3 
    .withColumnRenamed("date", "dateC") 
    .withColumnRenamed("milliSec", "milliSecC") 
    .withColumnRenamed("mid", "midC") 
    .withColumnRenamed("time", "timeC") 
    .withColumnRenamed("binImbalance", "binImbalanceC") 

    d1.join(d2, d1("date") === d2("dateC") and 
       d1("time") === d2("timeC") and 
       d1("mid") === d2("midC") 
     ) 
    .groupBy("date", "time", "mid", "binImbalance") 
    .agg(collect_list("timeB"),collect_list("midB")) 

しかし、私はエラーが出るので、これは動作しません:: Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35を。 同時に、timeB列の名前を変更した場合、リスト内の値を収集することはできません。

例の結果は次のようになります。

+-----+---------+------+------------+---------+------+ 
| date|  time| mid|binImbalance| timeB| midB| 
+-----+---------+------+------------+---------+------+ 
| 1 |  1 | 10 |   1| 4 | 10 |   
| 2 |  2 | 20 |   2| 5 | 11 |    
| 3 |  3 | 30 |   3| 6 | 12 |    


+-----+---------+------+------------+---------+------+ 
| date|  time| mid|binImbalance| timeB| midB| 
+-----+---------+------+------------+---------+------+ 
| 1 |  1 | 10 |   1| 7 | 13 |   
| 2 |  2 | 20 |   2| 8 | 14 |    
| 3 |  3 | 30 |   3| 9 | 15 | 

RESULT:

+-----+---------+------+------------+---------+-----------+ 
| date|  time| mid|binImbalance| ListTime| ListMid | 
+-----+---------+------+------------+---------+-----------+ 
| 1 |  1 | 10 |   1| [4,7] | [10,13] |   
| 2 |  2 | 20 |   2| [5,8] | [11,14] |    
| 3 |  3 | 30 |   3| [6,9] | [12,15] | 

、最小完全、かつ検証例

d1   d2 
id data  id data  
-- ----  -- ---- 
1 1  1 2 
2 4  2 5 
3 6  3 3 

Result 
id list 
-- ---- 
1 [1,2] 
2 [4,5] 
3 [6,3] 
+0

'd1.printSchema'と' d3.printSchema'を質問に追加できますか? –

答えて

0

最小例に対処:

import org.apache.spark.sql.functions.udf 

val aggregateDataFrames = udf((x: Double, y: Double) => Seq(x,y)) 

val d3 = d2.withColumnRenamed("id","id3") 
      .withColumnRenamed("data","data3") 

val joined = d1.join(d3, d1("id") === d3("id3")) 


val result = joined 
       .withColumn("list", aggregateDataFrames(joined("data"),joined("data3"))) 
       .select("id","list") 
+0

'val aggregateDataFrames:(Double、Double)=> Seq [Double]'を使い、タイプを 'udf'のままにしておきます。また、いくつかのキーストロークを減らすので(実際にはもっとうまくいくかもしれません)、列(または '$'(ドル記号))にアクセスするには '' '(単一のアポストロフィー)を使います。 –

関連する問題