2017-05-09 1 views
0

パーティションのインデックスとrownumberをrddにパーティションに追加しようとしています。しかし、私が最後のrownumberの値を取得しようとしたとき、私はゼロを得ました、rownumber配列は手つかずのようでした。可変スコープの問題?spark-shellのmapPartitionsWithIndexを使用して、rowNumber()を超える(partition_index)

これはrowNumber()/ count()以上(partition_index)ですが、rownumberはパーティションインデックスとともに1つのループに追加されています。

scala> val rdd1 = sc.makeRDD(100 to 110) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:25 

scala> val rownums=new Array[Int](3) 
rownums: Array[Int] = Array(0, 0, 0) 

scala> val rdd2=rdd1.repartition(3).mapPartitionsWithIndex((idx, itr) => itr.map(r => (idx, {rownums(idx)+=1;rownums(idx)}, r))) 
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[37] at mapPartitionsWithIndex at <console>:29 

scala> rdd2.collect.foreach(println) 
(0,1,100) 
(0,2,107) 
(0,3,104) 
(0,4,105) 
(0,5,106) 
(0,6,110) 
(1,1,102) 
(1,2,108) 
(1,3,103) 
(2,1,101) 
(2,2,109) 

scala> //uneffected?? 

scala> rownums.foreach(println) 
0 
0 
0 

scala> rownums 
res20: Array[Int] = Array(0, 0, 0) 

私は(6,3,2)を期待していrownumsため:(


解決使用してアキュムレータ:

scala> import org.apache.spark.util._ 
import org.apache.spark.util._ 

scala> val rownums=new Array[LongAccumulator](3) 
rownums: Array[org.apache.spark.util.LongAccumulator] = Array(null, null, null) 

scala> for(i <- 0 until rownums.length){rownums(i)=sc.longAccumulator("rownum_"+i)} 

scala> val rdd1 = sc.makeRDD(100 to 110) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at makeRDD at <console>:124 

scala> val rownums2=new Array[Int](3) 
rownums2: Array[Int] = Array(0, 0, 0) 

scala> val rdd2=rdd1.repartition(3).mapPartitionsWithIndex((idx, itr) => itr.map(r => (idx, {rownums2(idx)+=1;rownums(idx).add(1);rownums2(idx)}, r))) 
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[97] at mapPartitionsWithIndex at <console>:130 

scala> rdd2.collect.foreach(println) 
(0,1,107)                  
(0,2,106) 
(0,3,105) 
(0,4,110) 
(0,5,104) 
(0,6,100) 
(1,1,102) 
(1,2,103) 
(1,3,108) 
(2,1,109) 
(2,2,101) 

scala> rownums.foreach(x=>println(x.value)) 
6 
3 
2 

scala> 
+0

あなたは何をしようとしていますか、各パーティションの行数を取得しますか? – puhlen

+0

私はrddパーティションの各行にrow_numberを追加しようとしています。アキュムレータは私の問題を解決しました。 – myeyre

答えて

0

してくださいここで

は、コードが来ますプログラミングガイドの Understanding closuresを読んでください:

実行前に、Sparkはタスクの終了を計算します。クロージャは、実行者がRDD(この場合foreach())で計算を実行するために表示されなければならない変数とメソッドです。このクロージャはシリアライズされ、各エグゼキュータに送信されます。

各エグゼキュータに送信されるクロージャ内の変数はコピーされるため、foreach関数内でカウンタが参照されると、ドライバノードのカウンタは使用できなくなります。まだドライバノードのメモリにカウンタがありますが、これはもはやエグゼキュータには見えません!エグゼキュータは、シリアライズされたクロージャからコピーを見るだけです。したがって、カウンタの最終的な値は、カウンタのすべての操作がシリアライズされたクロージャ内の値を参照していたので、依然としてゼロになります。

元の変数ではなく、変数のローカルコピーを変更しています。

+0

ローカルコピーをエグゼキュータからドライバに取り出すことは可能ですか? – myeyre

+0

リンクをありがとう、アキュムレータをチェックします。 – myeyre

0

分散システムで実行されます。つまり、関数外の要素を変更するアクセス権がありません。

各パーティションのカウントを持つアレイを取得する場合は、RDDをRDD[Int]に変換する必要があります。ここで、各行はパーティションのカウントです。

rdd.mapPartitions(itr => Iterator(itr.size)) 

パーティションインデックスが重要な場合は、行数とそれを一緒に含めるように作成してRDD[Int,Int]することができます。

rdd.mapPartitionsWithIndex((idx, itr) => Iterator((idx, itr.size))) 
関連する問題