2016-10-27 15 views
0

Apacheスパークでは、sparkContext.union()メソッドを使用して複数のRDDを効率的に結合できます。誰かが複数のRDDを交差させたい場合、似たようなものがありますか?私はsparkContextメソッドを検索しましたが、何か他の場所を見つけることができませんでした。 1つの解決策は、rddsを結合してから重複を取り出すことですが、効率的であるとは思いません。私は、キー/値ペアのコレクションを次の例を持っていると仮定すると:複数のRDDSだけではなく、2のためにApache Spark - 複数のRDDの交差

(1,2.0) (1,1.0) 

しかし、もちろん:

val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0))) 
val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0))) 

は、私は、次の要素を持つ新しいコレクションを取得したいです。

+0

なぜ複数のrddを交差させたいのですか?どのような基準で? – Shankar

+0

私の質問は今理解している方が良いと思います。 –

答えて

2

試してみてください。

val rdds = Seq(
    sc.parallelize(Seq(1, 3, 5)), 
    sc.parallelize(Seq(3, 5)), 
    sc.parallelize(Seq(1, 3)) 
) 
rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys 
+0

それは、ありがとう、働く。しかし、各コレクションに、整数ではなくキー/値のペアがあってもうまくいかない場合は、正しいでしょうか?また、このメソッドは結合を使用します。通常、ハッシュ・パーティショナーは良い習慣ですよね? –

+0

要素をハッシュできる限り動作します。別の出力が必要な場合を除きます。 2番目の質問を理解しないでください。 –

+0

rdd間の結合を使用する前に、冗長な再シャフリングを避け、より効率的にするために、Hashパーティショナを使用することをお勧めします。あなたのコードでは、ハッシュパーティショニングを使用しません。 –

2

ありRDDのintersection methodはあるが、それは一つだけ、他のRDDを取ります

def intersection(other: RDD[T]): RDD[T] 

のは、あなたがこの1の観点たいメソッドを実装してみましょう。

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = { 
    rdds.reduce { case (left, right) => left.intersection(right) 
} 

あなたが参加し、あなたが最初の最大のRDDを置くことによって実行を最適化することができスパークの実装を見てきました場合:

def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = { 
    rdds.sortBy(rdd => -1 * rdd.partitions.length) 
    .reduce { case (left, right) => left.intersection(right) 
} 

EDITを:あなたのテキストを:私はあなたの例を読み違えるように見えますあなたはrdd.unionの逆の振る舞いを探しているように見えましたが、あなたの例はキーで交差することを意味しています。私の答えはこのケースに対処していません。

関連する問題