2016-10-11 23 views
0

スパーク1.6.0(私はスパークとスケーラに慣れていません)では、コレクションを反復処理して、繰り返しが配列上にあるときに配列にアイテムを追加すると配列が空に見えます。繰り返し後にスパーク配列が空です

var testing = unlabeled.map { line => 
    val parts = line.split(',') 
    val text = parts(7).split(' ') 
    (line, htf.transform(text)) 
} 

var lowPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)] 
var highPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)] 

for(counter <- 1 to 5){ 

    logger.info("this is the " + counter + " run -----------------") 
    for (i <- testing) { 
    val label = model.predict(i._2).toString 
    //  logger.info(i._1.split(",")(7)) 
    //  logger.info(label) 
    var probs = model.predictProbabilities(i._2) 
    logger.info("prob 0 : " + probs(0)) 
    logger.info("prob 1 : " + probs(1)) 
    logger.info("--------------------- ") 

    if (probs(0).toDouble <= 0.95 && probs(1).toDouble <= 0.95) { 
     lowPropQueue.+=(i) 
    } else { 
     highPropQueue.+=((i._1 + "," + label , i._2)) 
    } 

    logger.info("size of high array : " + highPropQueue.length) 
    logger.info("size of low array : " + lowPropQueue.length) 

    } 

    logger.info("passed: " + lowPropQueue.length) 
    logger.info("NOT passed: " + highPropQueue.length) 

    var xx= sc.parallelize(highPropQueue).collect() 
    var yy = sc.parallelize(lowPropQueue).collect() 

    logger.info("passed: " + xx.length) 
    logger.info("NOT passed: " + yy.length) 
... 
} 

が、ログに基づいて内部ループ、すなわち、配列に要素を追加しているようだ:

16/10/11十一時22分31秒INFO SelfLearningMNB $:高配列のサイズ: 500

16/10/11 11時22分31秒INFO SelfLearningMNB $:低いアレイのサイズ:83

16/10/11 11時22分31秒INFO SelfLearningMNB $:PROB 0:0.37094327822665185

16/10/11 11:22:31情報SelfLearningMNB $:prob 1:0.6290567217733481

16/10/11 11:22:31情報SelfLearningMNB $:------------ ---------

16/10/11 11時22分31秒INFO SelfLearningMNB $:高配列のサイズ:500

16/10/11 11時22分31秒INFOのSelfLearningMNBの$ :低配列のサイズ:84

16/10/11 11時22分31秒INFO SelfLearningMNB $:PROB 0:0.16872929936216619

16/10/11 11時22分31秒INFO SelfLearningMNB $:PROB 1:

16/10/11 11時43分53秒INFO SelfLearningMNB $::渡さ:0

0.8312707006378338

しかし、内側のループは、私はこれを取得終了したとき16/10/11 11:43:53情報SelfLearningMNB $:NOT合格:0

何が起こっているのですか?

EDIT

どのように執行からデータを取得したり、HDFSへのエグゼキュータからのデータを保存することができますので、彼らは後にマスターノードから読み取ることができますか?

答えて

1

TL; DRこれはSparkでは動作しません。

何が起こっているのですか?

  • 各エグゼキュータはlowPropQueuehighPropQueueの独自のコピーを取得します。反復ローカルコピーはスレッドセーフではありませんArrayBuffer

FYIナイーブ追記を破棄された後

  • は反復中のローカルコピーが
  • 修正されています。

  • +0

    私はそれについてですが。しかし、エグゼキュータからどのようにデータを "グローバル"配列に格納することが可能ですか? – bill

    +0

    アキュムレータを試してみることはできますが、アクセスを同期させ、コードの縮尺を変えないでください。 –

    +0

    私はいくつかの検索を行いましたが、このアプローチはSparkには適切ではありません。私はすべてをマッピングしなければならなかったが、それはうまくいった – bill