2017-05-17 14 views
0

同じキーと異なる値を持つ2つのRDDがあります。 私は .partitionBy(partitioner)同じそれらの両方に呼び出してから、私はそれらを結合:同じキーを持つ2つのRDDに同じHashPartitionerを適用すると、同じパーティションにならない

val partitioner = new HashPartitioner(partitions = 4) 

val a = spark.sparkContext.makeRDD(Seq(
    (1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E"), (6, "F"), (7, "G"), (8, "H") 
)).partitionBy(partitioner) 

val b = spark.sparkContext.makeRDD(Seq(
    (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e"), (6, "f"), (7, "g"), (8, "h") 
)).partitionBy(partitioner) 

println("A:") 
a.foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

println("B:") 
b.foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

println("Join:") 
a.join(b, partitioner) 
    .foreachPartition(p => { 
    p.foreach(t => print(t + " ")) 
    println() 
}) 

私が手:

A: 
(2,B) (3,C) (4,D) (6,F) (7,G) 
(8,H) (1,A) 
(5,E) 

B: 
(3,c) (7,g) 
(1,a) (5,e) 
(2,b) (6,f) 
(4,d) (8,h) 

Join: 
(6,(F,f)) (1,(A,a)) (2,(B,b)) (5,(E,e)) (4,(D,d)) (8,(H,h)) 
(3,(C,c)) (7,(G,g)) 

AとBのパーティションが異なっていると、なぜjoinRDDである理由だから、最初の質問です両方とも違う?

答えて

1

パーティション分割は、すべてのケースでまったく同じです。問題は、使用する方法です。各パーティションは別々のスレッドで処理されることに注意してください。このコードを複数回実行すると、出力が実際には非決定的であることがわかります。場合は、各パーティション内の値の順序は、まだ非決定できること

a.glom.collect.map(_.mkString(" ")).foreach(println) 
(4,D) (8,H) 
(1,A) (5,E) 
(2,B) (6,F) 
(3,C) (7,G) 
b.glom.collect.map(_.mkString(" ")).foreach(println) 
(4,d) (8,h) 
(1,a) (5,e) 
(2,b) (6,f) 
(3,c) (7,g) 
a.join(b).glom.collect.map(_.mkString(" ")).foreach(println) 
(4,(D,d)) (8,(H,h)) 
(1,(A,a)) (5,(E,e)) 
(6,(F,f)) (2,(B,b)) 
(3,(C,c)) (7,(G,g)) 

注:代わりに、このような例を何かのため

てみてください非localコンテキストで実行されますが、各パーティションの内容は失われますlは上記と同じです。

関連する問題