2017-05-02 20 views
0
(accountid, mid, url, spent) 
RDD(("55E5", 5, "https://www.google.com/", 5774), 
("55E5", 5, "https://www.google.com/", 543), 
("55E5", 5, "https://www.google.com/", 52), 
("55E5", 5, "https://www.google.com/", 85), 
("55E5", 5, "https://www.google.com/", 54), 
("55E5", 5, "https://www.google.com/", 287), 
("54XJ", 5, "https://www.google.com/", 853), 
("54XJ", 5, "https://www.google.com/", 2), 
("54XJ", 5, "https://www.google.com/", 55), 
("54XJ", 5, "https://www.google.com/", 984), 
("54XJ", 5, "https://www.google.com/", 24), 
("54XJ", 5, "https://www.google.com/", 57)) 
("745K", 5, "https://www.google.com/", 853), 
("745K", 5, "https://www.google.com/", 2), 
("745K", 5, "https://www.google.com/", 55), 
("745K", 5, "https://www.google.com/", 984), 
("745K", 5, "https://www.google.com/", 24), 
("745K", 5, "https://www.google.com/", 57)) 

私はこのようなタプルのRDDを持っていると言いますが、上記のように注文されていません。私は上記の各口座IDのために最高の3つの最高額に戻したいと思っています。スパークで減少するフィルタ

私はそれらを.sortBy(x => (x._1, x._4))で注文してから折り畳みをしていると思っていましたが、自分のRDDに戻す方法はわかりません。これを行うにはよりエレガントな方法が必要です。また、場合によっては3つ以下のアイテムがあるかもしれません。私は戻って私のRDDに追加する方法がわからない

+0

最終結果はどうなるでしょうか? 'RDD [(accountID、Seq [費やし])]'? – puhlen

答えて

1

... ...

スパークで作業する場合、あなたは常に、程度は新しいRDDにデータを変換する必要があると思います"更新中"既存のRDD:RDDは不変であり、Sparkはを介して1つのRDDの変換をで計算します。

具体的には、IDでデータを「グループ化」して、結果の「グループ」にいくつかのロジック(ソートしてトップ3を取る)を適用するように見えます。これを実現するには、以下の2つの方法があります.1つはこのフロー(グループ、sort + takeを使用したマップ値)のかなり簡単な実装で、もう1つは特定の状況下で重要な最適化です。レコード)

// just an alias to make things shorter to write... 
type Record = (String, Int, String, Int) 

// simple, but potentially slow/risky: 
// groupBy "collects" all records with same key into a single record, which means 
// it can't scale well if a single key has many records: 
val result1: RDD[Record] = rdd.groupBy(_._1).values.flatMap(_.toList.sortBy(-_._4).take(3)) 

// an alternative approach that does the same, but should be faster 
// and less fragile - at no point would we collect all records of a single key 
// into a collection in one worker's memory. We do that by replacing "groupByKey" 
// with "aggregateByKey" with functions that would keep only top 3 items per key at all times 
val result2: RDD[Record] = rdd.keyBy(_._1) 
    .aggregateByKey(mutable.SortedSet[Record]()(Ordering.by(-_._4)))(
    { case (list, item) => (list + item).take(3) }, 
    { case (list1, list2) => (list1 ++ list2).take(3) } 
).values 
    .flatMap(_.toList) 
+0

ありがとうございました。 1つのレコードが最初のインスタンスで持つことができるのは、おそらく〜500kレコードです。それは問題だろうか? – ozzieisaacs

+1

500Kの音で十分ですが、問題ありません。まず、Sparkの_partitionメモリサイズ_に2GBの制限があります。これらのレコードのうち500Kは、これらの文字列の長さに応じて、0.5〜1GBまで簡単にcomsumeすることができます。そしてそれがなくても、Sparkは1つのコアを使用して500Kのレコードを作成しなければなりません。 –

関連する問題