2017-05-26 13 views
0

私はスカラの/:演算子を使用して一連のデータセット集計を計算しています。集計のためのコードは以下の通りである:私はFパラメータとして渡され別々の機能のリストについてはブロックを実行しようとすると、問題ScalaのSparkデータセットの集約

def execute1( 
xy: DATASET, 
f: Double => Double): Double = { 

println("PRINTING: The data points being evaluated: " + xy) 
println("PRINTING: Running execute1") 

var z = xy.filter{ case(x, y) => abs(y) > EPS} 

var ret = - z./:(0.0) { case(s, (x, y)) => { 
    var px = f(x) 
    s + px*log(px/y)} 
} 

ret 
} 

私の問題が発生します。機能のリストである:

lazy val pdfs = Map[Int, Double => Double](
1 -> betaScaled, 
2 -> gammaScaled, 
3 -> logNormal, 
4 -> uniform, 
5 -> chiSquaredScaled 
) 

リストを集計を実行するエグゼキュータの関数である:最終実行ブロックに

def execute2( 
xy: DATASET, 
fs: Iterable[Double=>Double]): Iterable[Double] = { 
fs.map(execute1(xy, _)) 
} 

val kl_rdd = master_ds.mapPartitions((it:DATASET) => { 
val pdfsList = pdfs_broadcast.value.map(
    n => pdfs.get(n).get 
) 

execute2(it, pdfsList).iterator 

問題は、一方で集計が行われると、出力配列の最初のスロットにすべて集計されているように見えますが、各関数の集計を別々に表示したいと思っています。私は5つの機能すべてが実際に実行されていること、そしてそれらが最初のスロットで合計されていることを確認するためにテストを実行しました。

The pre-divergence value: -4.999635700491883 
The pre-divergence value: -0.0 
The pre-divergence value: -0.0 
The pre-divergence value: -0.0 
The pre-divergence value: -0.0 

これは私がこれまで実行してきた中で最も困難な問題の1つで、どの方向性も高く評価されます。その支払期日がどこであるかを示す。ありがとう! https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Datasetと実際の型パラメータDataSet[T]を必要とし、その名前は、すべての資本ケースではありません。

答えて

1

スパークのデータセットは、foldLeft(別名/:)を持っていません。

DATASETの型はイテレータなので、最初に実行した後にはexecute1になってしまいます。したがって、その後のすべてのexecute1は空のイテレータを取得します。基本的には、すべての関数を集約するのではなく、最初のものを実行し、他の関数を無視します(foldLeftに初期値として0.0を渡したため-0.0を取得します)。

あなたはmapPartitions署名から見ることができるように:

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] 

あなたは(潜在的に限られた大)を取得するためにit.toListを行う必要がありますので、それは、あなたのイテレータ(一度だけ横断することができ、可変構造)を与えます不変構造(List)。

P.S. SparkのDataSet/RDDで実際に作業したい場合は、aggregate(RDD)またはagg(DataSet)を使用してください。参照:イテレータについてfoldLeft or foldRight equivalent in Spark?


説明:

scala> val it = List(1,2,3).toIterator 
it: Iterator[Int] = non-empty iterator 

scala> it.toList //traverse iterator and accumulate its data into List 
res0: List[Int] = List(1, 2, 3) 

scala> it.toList //iterator is drained, so second call doesn't traverse anything 
res1: List[Int] = List()