2017-08-29 10 views
1

スパーク/ Scalaは使用してReduceByKeyを使用して入れ子構造を作成します。私は巨大なデータに対してはうまく機能しないgroupBy関数を使ってこれを行うことができます。ですから、reduceByKeyを使ってやりたいのですが、私が望むものを得ることができません。どんな助けもありがとう。スパーク/スカラ:RDDのみ</p> <p>私はRDDを使用して唯一の入れ子構造を作成したい使用してReduceByKeyを使用して入れ子構造の作成:RDDのみ

入力データ:

val sales=sc.parallelize(List(
    ("West", "Apple", 2.0, 10), 
    ("West", "Apple", 3.0, 15), 
    ("West", "Orange", 5.0, 15), 
    ("South", "Orange", 3.0, 9), 
    ("South", "Orange", 6.0, 18), 
    ("East", "Milk", 5.0, 5))) 

必要な出力は、構造体のリストです。私は以下のように使用して、このgroupByKeyを行うことができる午前:

sales.map(value => (value._1 ,(value._2,value._3,value._4 ))) 
    .groupBy(_._1) 
    .map { case(k,v) => (k, v.map(_._2)) } 
    .collect() 
    .foreach(println) 

// (South,List((Orange,3.0,9), (Orange,6.0,18))) 
// (East,List((Milk,5.0,5))) 
// (West,List((Apple,2.0,10), (Apple,3.0,15), (Orange,5.0,15))) 

しかし、私はreduceByKeyを使用して同じことを達成したいです。私はリスト[構造]を得ることができません。代わりにList [List]を取得できます。 List [Struct]を取得する方法はありますか?

sales.map(value => (value._1 ,List(value._2,value._3,value._4))) 
    .reduceByKey((a,b) => (a ++ b)) 
    .collect() 
    .foreach(println) 

// (South,List(Orange, 3.0, 9, Orange, 6.0, 18)) 
// (East,List(Milk, 5.0, 5)) 
// (West,List(Apple, 2.0, 10, Apple, 3.0, 15, Orange, 5.0, 15)) 

sales.map(value => (value._1 ,List(value._2,value._3,value._4))) 
    .reduceByKey((a,b) =>(List(a) ++ List(b))) 
    .collect() 
    .foreach(println) 

// (South,List(List(Orange, 3.0, 9), List(Orange, 6.0, 18))) 
// (East,List(Milk, 5.0, 5)) 
// (West,List(List(List(Apple, 2.0, 10), List(Apple, 3.0, 15)), List(Orange, 5.0, 15))) 

答えて

2
  • あなたはできませんからreduceByKey機能(V, V) ⇒ Vが必要ですので、それはタイプを変更することはできません。あなたはそれがパフォーマンスが改善されませんaggregateByKeycombineByKeyが、を使用することができますあなたのプロセスは、データの量を減らすことはありませんので、例Can reduceBykey be used to change type and combine values - Scala Spark?
  • を参照してください。例えばSpark groupByKey alternativeを参照してください。
  • あなたは(一時オブジェクトは必要ありません)で少しを得ることができます。

    sales.map(value => (value._1 ,(value._2,value._3,value._4))).groupByKey 
    
関連する問題