2017-08-02 6 views
0

私はtransactionCount変数を100にしようとしましたが、私は0を得ました。私は常時1つのパーティションしか持っていないRDDを持っています。私はRDDを処理するこのようなコードを持っています:foreachPartitionの変数への奇妙な影響

var transactionCount = -1 

payment_rdd.foreachPartition { partitionOfRecords => 
    // this line affect 100 to transactionCount since the I have 100 record 
    // in my RDD so in my partition 
    transactionCount = partitionOfRecords.size 
    partitionOfRecords.foreach { record => 
     //I procces each record 
    } 
    try { 
    // this line keep 100 to transactionCount 

    //another process 
    } 
    catch { 
    case _: Throwable => { 
     // I never pass here 
     log.error("Cannot process correctly") 
     transactionCount = 0 
    } 
    } 
} 
return transactionCount 

私は100にもかかわらず-1を得て、なぜ私は理解できません。 あなたはもっと良い解決策がありますか?おかげ

+0

は常にあなたの 'try'ブロック増加例外である場合があります。

私はTryとアキュムレータを使用すると思います。 –

+0

このような突然変異は一般的には良いとは言えませんが、コメントで言及しているように、特に – cchantep

+0

という分布があります。例外は発生しません。 –

答えて

2

あなたはこのコードを実行します。

  • スパークは、閉鎖を計算します。
  • クロージャによって必要とされる各変数をシリアライズし、エグゼキュータに送信します。
  • コードが実行されると、各エグゼキュータはデシリアライズされた変数のローカルコピーを変更します。

これは説明されており、また、あなたがこれを行うことはできませんprogramming guide

で説明:

transactionCount = partitionOfRecords.size 

Iteratorsは一度だけ横断することができ、あなたのサイズを計算した後、空になります。

val transactionCount = spark.sparkContext.longAccumulator 

rdd.foreach { record => { 
    if Try { 
    // your code goes here 
    }.isSuccess transactionCount.add(1L) 
}} 
+0

ありがとう、それは非常にうまくいく –

関連する問題