2016-09-28 9 views
1

私は、ファイルストリームをチェックポイントするSparkストリーミングアプリケーションを開発しました。私は糸の上に私のコードを実行していますしかし、私は上記のコードからsaprkストリーミングアプリケーションの例外処理

yarn.ApplicationMaster: User class threw exception: 
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable 
org.apache.spark.streaming.StreamingContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 
    - field (class: UnionStream$$anonfun$creatingFunc$3, name: ssc$1, type: class org.apache.spark.streaming.StreamingContext) 
    - object (class UnionStream$$anonfun$creatingFunc$3, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) 
    - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStream) 
    - object (class org.apache.spark.streaming.dstream.ForEachDStream, [email protected]) 
    - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) 
    - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, 

を例外を取得

val fileStream=.. 
    fileStream.checkpoint(Duration(batchIntervalSeconds * 1000 * 5)) 

//initiate the chekpointing 
fileStream.foreachRDD(r=> { 
    try { 
    r.count() 
    } catch { 
    case ex: Exception => { 
     ssc.stop(true, true) 
    } 

    } 
} 
) 

:私は、次のとおりである任意のドライブexception..myコードに私のストリーミングアプリケーションを停止する必要がありますクラスタモード..

答えて

0

あなたは、{}} forEachRDDの外にキャッチし、試してみる+キャッチ{内foreachrddの呼び出しをラップする試みを取るために、この

try { 
//initiate the chekpointing 
fileStream.foreachRDD(r=> { 
    r.count() 
    } 
} 
} catch { 
    case ex: Exception => { 
     ssc.stop(true, true) 
    } 
) 
のようなものを試してみました

sparkStreamingContextを必要とする例外ハンドルを含むforeachRDDブロック内のすべてのコードをsparkが取っていて、現在のRDD上のプロセスを処理するノードに送信できるように直列化しようとしたようです。 SparkStreamingContextはシリアル化可能ではないため、その吹き飛ばします。

+0

質問がありますか? r.count()は常にドライバノード上で実行されますか?ドライバーから例外を取得する必要があります。 – mahdi62

+0

foreachRDD内で何をしていても、タスクを処理するノードに行くことはありません。しかし、あなたが望むのは、エラーが発生した場合にコンテキストを停止し、ドライバからのコンテキストにのみアクセスできるようにすることです。 –

0

foreachRDD呼び出し内で例外が発生した場合にsparkストリーミングアプリケーションを停止するには、foreachRDD内で例外をキャッチしようとしないでください。代わりに、try/catchブロック内でssc.awaitTermination呼び出しをラップし、そこからssc.stopを呼び出します。

val ssc = createStreamingContext()   
ssc.start() 
try { 
    ssc.awaitTermination() 
} catch { 
    case e: Exception => 
    ssc.stop(stopSparkContext = true, stopGracefully = true) 
    throw e // to exit with error condition 
}