2016-12-09 8 views
1

私は巨大なRDD(ソース)を持っており、BloomFilterのデータを作成する必要があるので、ユーザのデータへのその後の更新では真の "diffs"SparkとBloomFilterの共有

ブルームフィルタの実装のほとんどが(つまりかかわらず簡単に固定することができる)非直列化​​可能ですが、私は少し異なるワークフローをしたいように見える:

  1. プロセスのすべてのパーティションを、適切なブルームフィルタのインスタンスを作成しますそれぞれのパーティションに対して実行されます。それらのBloomFilterオブジェクトのそれぞれについて、どこかのバイナリファイルに書き込みます。実際にパーティション全体をどのように処理するかはわかりません - mapPartition関数がRDDで利用可能ですが、それはIteratorを返すことを期待しています。おそらく、渡されたイテレータを消費し、BloomFilterのインスタンスを作成し、どこかに書き込んで、リンクを作成ファイルのIterator.singleton[PathToFile]として返しますか?
  2. マスターノードで - consumeその処理の結果(Fileへのパスのリスト)を読み込み、それらのファイルを読み取り、メモリ内のBloomFiltersを集約します。次に、レスポンスをバイナリファイルに書き込みます。

私は、正しい方法を知らない:渡された関数内から(それはHDFS、S3Nまたはローカルファイルすることができる)

  • クラスタでサポートされているFSにファイルを作成します〜mapPartitions
  • 2番目のフェーズのファイルの内容をconsumeと読みます(ファイルへのパスがあるRDDがあり、それを読むにはSparkContextを使用する必要があります)。

ありがとう!

答えて

1

breeze実装が最速のものではありませんが、それは通常のスパークの依存関係が付属していますし、simple aggregateで使用することができます。

スパークで
import breeze.util.BloomFilter 

// Adjust values to fit your case 
val numBuckets: Int = 100 
val numHashFunctions: Int = 30 

val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4) 
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
    _ += _, _ |= _ 
) 

bf.contains("a") 
Boolean = true 
bf.contains("n") 
Boolean = false 

2.0+あなたがDataFrameStatFunctions.bloomFilterを使用することができます。

val df = rdd.toDF 

val expectedNumItems: Long = 1000 
val fpp: Double = 0.005 

val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp) 

sbf.mightContain("a") 
Boolean = true 
sbf.mightContain("n") 
Boolean = false 

Algebird実装は同様に動作し、breeze実装と同様に使用することができます。

関連する問題