2017-04-13 6 views
2

スパークストリーミングを使用して統計処理を行っています。ここに私のコードです:スパークストリーミングエラー:エグゼキュータに送信する前にアキュムレータを登録する必要があります

"Accumulator must be registered before send to executor"

私はすでにここにアキュムレータを登録しています:

val accum = sparkSession.sparkContext.longAccumulator("Total Count") 

なぜ私は、私は上記のプログラムを実行すると、私はランタイムエラーを得た

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(60)) 
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)//create a stream 

val accum = sparkSession.sparkContext.longAccumulator("Total Count") 

eventHubsStream.foreachRDD(rdd => { 
    accum.add(rdd.count()) 
    SavetoStorage(accum); //save to storage 
}) 

そのようなエラーを受け取りますか?

ありがとうございました

答えて

1

アキュムレータを登録すると修正できます。スパーク2.2では、次のコードでは、うまく

を働く
val sc = spark.sparkContext 
    sc.register(accum, <Name_of_your_accumulator>); 
    ... next actions with accumulator ... 

私はそれが次のリリース

で切断されないことを願っています
関連する問題