2
スパークストリーミングを使用して統計処理を行っています。ここに私のコードです:スパークストリーミングエラー:エグゼキュータに送信する前にアキュムレータを登録する必要があります
"Accumulator must be registered before send to executor"
私はすでにここにアキュムレータを登録しています:
val accum = sparkSession.sparkContext.longAccumulator("Total Count")
なぜ私は、私は上記のプログラムを実行すると、私はランタイムエラーを得た
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(60))
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)//create a stream
val accum = sparkSession.sparkContext.longAccumulator("Total Count")
eventHubsStream.foreachRDD(rdd => {
accum.add(rdd.count())
SavetoStorage(accum); //save to storage
})
そのようなエラーを受け取りますか?
ありがとうございました