パーティションのインデックスとrownumberをrddにパーティションに追加しようとしています。しかし、私が最後のrownumberの値を取得しようとしたとき、私はゼロを得ました、rownumber配列は手つかずのようでした。可変スコープの問題?spark-shellのmapPartitionsWithIndexを使用して、rowNumber()を超える(partition_index)
これはrowNumber()/ count()以上(partition_index)ですが、rownumberはパーティションインデックスとともに1つのループに追加されています。
scala> val rdd1 = sc.makeRDD(100 to 110)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:25
scala> val rownums=new Array[Int](3)
rownums: Array[Int] = Array(0, 0, 0)
scala> val rdd2=rdd1.repartition(3).mapPartitionsWithIndex((idx, itr) => itr.map(r => (idx, {rownums(idx)+=1;rownums(idx)}, r)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[37] at mapPartitionsWithIndex at <console>:29
scala> rdd2.collect.foreach(println)
(0,1,100)
(0,2,107)
(0,3,104)
(0,4,105)
(0,5,106)
(0,6,110)
(1,1,102)
(1,2,108)
(1,3,103)
(2,1,101)
(2,2,109)
scala> //uneffected??
scala> rownums.foreach(println)
0
0
0
scala> rownums
res20: Array[Int] = Array(0, 0, 0)
私は(6,3,2)を期待していrownumsため:(
解決使用してアキュムレータ:
scala> import org.apache.spark.util._
import org.apache.spark.util._
scala> val rownums=new Array[LongAccumulator](3)
rownums: Array[org.apache.spark.util.LongAccumulator] = Array(null, null, null)
scala> for(i <- 0 until rownums.length){rownums(i)=sc.longAccumulator("rownum_"+i)}
scala> val rdd1 = sc.makeRDD(100 to 110)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at makeRDD at <console>:124
scala> val rownums2=new Array[Int](3)
rownums2: Array[Int] = Array(0, 0, 0)
scala> val rdd2=rdd1.repartition(3).mapPartitionsWithIndex((idx, itr) => itr.map(r => (idx, {rownums2(idx)+=1;rownums(idx).add(1);rownums2(idx)}, r)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[97] at mapPartitionsWithIndex at <console>:130
scala> rdd2.collect.foreach(println)
(0,1,107)
(0,2,106)
(0,3,105)
(0,4,110)
(0,5,104)
(0,6,100)
(1,1,102)
(1,2,103)
(1,3,108)
(2,1,109)
(2,2,101)
scala> rownums.foreach(x=>println(x.value))
6
3
2
scala>
あなたは何をしようとしていますか、各パーティションの行数を取得しますか? – puhlen
私はrddパーティションの各行にrow_numberを追加しようとしています。アキュムレータは私の問題を解決しました。 – myeyre