私たちはkafkaからメッセージを引き出し、forEachPartiton
変換を使用して個々のメッセージを処理するスパークストリーミングプログラムを用意しています。スパークストリーミングで例外をスローする方法
処理機能に特定のエラーがある場合は、例外を戻してプログラムを停止したいと考えています。同じことは起こっていないようです。以下は実行しようとしているコードです。上記のコードで
JavaInputDStream<KafkaDTO> stream = KafkaUtils.createDirectStream(...);
stream.foreachRDD(new Function<JavaRDD<KafkaDTO>, Void>() {
public Void call(JavaRDD<KafkaDTO> rdd) throws PropertiesLoadException, Exception {
rdd.foreachPartition(new VoidFunction<Iterator<KafkaDTO>>() {
@Override
public void call(Iterator<KafkaDTO> itr) throws PropertiesLoadException, Exception {
while (itr.hasNext()) {
KafkaDTO dto = itr.next();
try{
//process the message here.
} catch (PropertiesLoadException e) {
// throw Exception if property file is not found
throw new PropertiesLoadException(" PropertiesLoadException: "+e.getMessage());
} catch (Exception e) {
throw new Exception(" Exception : "+e.getMessage());
}
}
}
});
}
}
我々はPropertiesLoadException
を投げても、プログラムが停止しないとストリーミングが続行されます。スパーク設定で設定した最大再試行回数は4回です。ストリーミングプログラムは、4回の失敗後も継続します。プログラムを停止するには、どのように例外をスローする必要がありますか?
ご回答いただきありがとうございますが、内部メソッドから「jssc」にアクセスするにはどうすればよいですか。 jsssはドライバにありますが、例外はExecutorで捕捉されます。 –