2017-12-18 15 views
0

私たちはkafkaからメッセージを引き出し、forEachPartiton変換を使用して個々のメッセージを処理するスパークストリーミングプログラムを用意しています。スパークストリーミングで例外をスローする方法

処理機能に特定のエラーがある場合は、例外を戻してプログラムを停止したいと考えています。同じことは起こっていないようです。以下は実行しようとしているコードです。上記のコードで

JavaInputDStream<KafkaDTO> stream = KafkaUtils.createDirectStream(...); 

stream.foreachRDD(new Function<JavaRDD<KafkaDTO>, Void>() { 

    public Void call(JavaRDD<KafkaDTO> rdd) throws PropertiesLoadException, Exception {  
     rdd.foreachPartition(new VoidFunction<Iterator<KafkaDTO>>() { 

      @Override 
      public void call(Iterator<KafkaDTO> itr) throws PropertiesLoadException, Exception { 
       while (itr.hasNext()) { 
        KafkaDTO dto = itr.next(); 
        try{ 
         //process the message here. 
        } catch (PropertiesLoadException e) { 
         // throw Exception if property file is not found 
         throw new PropertiesLoadException(" PropertiesLoadException: "+e.getMessage()); 
        } catch (Exception e) { 
         throw new Exception(" Exception : "+e.getMessage()); 
        } 
       } 
      } 
     }); 
    } 
} 

我々はPropertiesLoadExceptionを投げても、プログラムが停止しないとストリーミングが続行されます。スパーク設定で設定した最大再試行回数は4回です。ストリーミングプログラムは、4回の失敗後も継続します。プログラムを停止するには、どのように例外をスローする必要がありますか?

答えて

0

私はこれが最善のアプローチであるかどうかはわかりませんが、私たちはtryとcatchでメインバッチを囲んでいました。さらに、猶予期間中停止がオフ(false)であることを確認する必要があります。

例コード:

try { 
    process(dataframe); 
} catch (Exception e) { 
    logger.error("Failed on write - will stop spark context immediately!!" + e.getMessage()); 
    closeContext(jssc); 
    if (e instanceof InterruptedException) { 
     Thread.currentThread().interrupt(); 
    } 
    throw e; 
} 

して閉じる機能:Configで

private void closeContext(JavaStreamingContext jssc) { 
    logger.warn("stopping the context"); 
    jssc.stop(false, jssc.sparkContext().getConf().getBoolean("spark.streaming.stopGracefullyOnShutdown", false)); 
    logger.error("Context was stopped"); 
} 

:私はあなたのコードと、それは次のようになりますことを考える

spark.streaming.stopGracefullyOnShutdown false

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, streamBatch); 
JavaInputDStream<KafkaDTO> stream = KafkaUtils.createDirectStream(jssc, ...); 

    stream.foreachRDD(new Function<JavaRDD<KafkaDTO>, Void>() { 

     public Void call(JavaRDD<KafkaDTO> rdd) throws PropertiesLoadException, Exception { 

      try { 

       rdd.foreachPartition(new VoidFunction<Iterator<KafkaDTO>>() { 

        @Override 
        public void call(Iterator<KafkaDTO> itr) throws PropertiesLoadException, Exception { 
         while (itr.hasNext()) { 
          KafkaDTO dto = itr.next(); 
          try { 
           //process the message here. 
          } catch (PropertiesLoadException e) { 
           // throw Exception if property file is not found 
           throw new PropertiesLoadException(" PropertiesLoadException: " + e.getMessage()); 
          } catch (Exception e) { 
           throw new Exception(" Exception : " + e.getMessage()); 
          } 
         } 
        } 
       }); 

      } catch (Exception e){ 
       logger.error("Failed on write - will stop spark context immediately!!" + e.getMessage()); 
       closeContext(jssc); 
       if (e instanceof InterruptedException) { 
        Thread.currentThread().interrupt(); 
       } 
       throw e; 
      } 

     } 
    } 

さらに、私のストリームは、スパーク2.1のスタンドアロン(糸/メソスルではない)クライアントモードで動作していることに注意してください。さらに私はZKを使って私の自己を優雅に停止させる。

+0

ご回答いただきありがとうございますが、内部メソッドから「jssc」にアクセスするにはどうすればよいですか。 jsssはドライバにありますが、例外はExecutorで捕捉されます。 –

関連する問題