2016-02-18 12 views
6

反復計算の結果を収集するRDDを作成したいとします。反復計算の結果を収集するためのRDDの作成

どのように次のコードを置き換えるために、ループ(または代替)を使用することができ:

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(NステップRDDSを作成し、最後に一緒に、次いでジッピングはまた、50 RDDS限りOKであろう

+0

Iを生成するscalazから 'Stream.unfold'を使用したいですステップのストリームを作成し、次にそれを圧縮する – Reactormonk

答えて

6

は、再帰関数がうまくいく)シード=(ステップ(N-1).MAX)条件を尊重する繰り返し作成されます。

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

テイル再帰は、スタックを吹き飛ばしているRDD系統からあなたを守ることはできません:) – zero323

+0

@ zero323合意しました。しかし、この問題は質問の要件に内在しています。どんな答えも同様の問題を抱えるでしょう。 –

+0

テールに最適化されないシーンの背後にある再帰的なデータ構造を構築していることを指摘しておきます。それ以上のことはありません:)そして実際には、チェックポイントを使用して問題を解決して回避することができます。それは単一のジッパーなしでも解決できます:) – zero323

関連する問題