私は、ファイルストリームをチェックポイントするSparkストリーミングアプリケーションを開発しました。私は糸の上に私のコードを実行していますしかし、私は上記のコードからsaprkストリーミングアプリケーションの例外処理
yarn.ApplicationMaster: User class threw exception:
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected])
- field (class: UnionStream$$anonfun$creatingFunc$3, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext)
- object (class UnionStream$$anonfun$creatingFunc$3, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>)
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected])
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData,
を例外を取得
val fileStream=..
fileStream.checkpoint(Duration(batchIntervalSeconds * 1000 * 5))
//initiate the chekpointing
fileStream.foreachRDD(r=> {
try {
r.count()
} catch {
case ex: Exception => {
ssc.stop(true, true)
}
}
}
)
:私は、次のとおりである任意のドライブexception..myコードに私のストリーミングアプリケーションを停止する必要がありますクラスタモード..
質問がありますか? r.count()は常にドライバノード上で実行されますか?ドライバーから例外を取得する必要があります。 – mahdi62
foreachRDD内で何をしていても、タスクを処理するノードに行くことはありません。しかし、あなたが望むのは、エラーが発生した場合にコンテキストを停止し、ドライバからのコンテキストにのみアクセスできるようにすることです。 –