2016-03-24 14 views
0
var cnt = 0 
val newRDD = oldRDD.map({ list => 
     ...some work 
     cnt = cnt + 1 
     println(cnt) // print 1, 2, 3.. well 
     newList // will store in new RDD 
    }) 

//out side of map 
println(cnt) // It's 0. Why? 

ログとしては、最初にマップの内側に印刷してから地図を印刷します。 cntの値が格納されないのはなぜですか?スカラ値がスパークに格納されていません

答えて

4

スパーク変換が純関数なければならない - 状態を変更したり、任意の副作用を有することなく、出力を生成し、入力を受信します。あなたの例はこれらに違反しています。

  • cntの初期値は、それをシリアライズさmapへの引数として渡された匿名関数は、シリアル化され、労働者に送ら
  • ある
  • cntが0ときである:ここでされて何が起こる

    デシリアライズ各ワーカー

  • ここで、各ワーカーは値でドライバアプリケーションが別の方法として

変わらないまま、それは...、あなたは「カウンター」のこれらのタイプを達成するためにスパークのAccumulatorsを使用することができますローカル

  • +0

    おかげで、それは私をたくさん助けて:) –

    2

    閉鎖で外部変数を使用すると、Sparkフレームワークで自動的にワーカーノードに送信されます。各タスクは変数の新しいコピーを取得しますが、タスク内の変数を更新すると(これはコードで起こります)、フレームワークはそれを返信して残りのプログラムと同期させません。そうするには高価です。

    クロージャで外部変数を使用している場合は、その変数を読み取り専用変数と考えることができます。

    マップした要素の数をカウントする場合は、最初にoldRDD.count()/newRDD.count()を使用できます(要素をフィルタリングしないと同じ結果が得られるようです)。