2017-12-07 2 views
1

RDDからブルームフィルタのセットを作成します。私は次のように見えるやり方:rddRDD[(Int, Long)]Spark aggregateByKey:キーを使用

のようなものである場合には

rdd.aggregateByKey(create(size))(add, combine).collect() 

問題は値が各グループ内に均一に分布されていないということです。つまり、「1つのsizeはすべてフィットします」はここでは機能しません。いくつの要素があるかによって、それぞれのキーごとにブルームフィルタを別々に初期化したいと考えています。

だから私は

countがカウントしたマップである
rdd.aggregateByKey(key => create(count(key))(add, combine).collect() 

ような何かをしたいです。

これは私がgroupByKeyとすることができるものですが、他の方法はありますか?

答えて

1

使用できる簡単なトリックがあります。 (key, (key, value))(key, value)からRDD変換:

val kkvRdd = rdd.map { case (key, value) => (key, (key, value)) } 

combineByKeyを適用します。

addcombineは、新しいタイプを受け入れるように調整されている
def createCombiner(v: (Int, Long)) = create(count(_._1)) // And insert _._2 

kkvRdd.combineByKey(createCombiner, add, combine) 

+0

ありがとう、これは私のケースでは完全に機能します –

関連する問題