2017-02-03 21 views
2

私はすでにGoogleグループに質問しましたが、まだ回答がありませんでした。これをここに投稿すると、別のユーザーになります。Reactive-Kafka:例外的に消費者を一時停止し、要求に応じて再試行する方法

Reactive-Kafkaを使用しています。メッセージの処理中に例外が発生した場合、コンシューマにメッセージを送信しないようにするシナリオを以下に示します。メッセージは、規定された時間後に、または消費者側からの明示的な要求の後に再試行されるべきである。現在のアプローチでは、消費者のデータベースがいつかダウンしても、カフカからの読み取りとメッセージの処理を試みますが、DBの問題のため処理が失敗します。これにより、アプリケーションが不必要にビジー状態になります。これに代えて、規定の時間(たとえば、再試行するのに30分待つ)の間、メッセージを受信するように消費者を一時停止する必要があります。 このケースの処理方法はわかりません。

同じことが可能ですか?何か不足していますか?ここで

は、反応性カフカから採取したサンプルコードです:この目的のためにrecoverWithRetriesコンビネータは

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
     .mapAsync(1) { msg => 
     Future { 
      /** 
      * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down 
      */ 
     }.map(_ => msg.committableOffset).recover { 
      case ex => { 
      /** 
       * HOW TO DO ???????? 
       * On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time 
       * or on demand from the last committed offset 
       */ 
      throw ex 
      } 
     } 
     } 
     .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) => 
     batch.updated(elem) 
     } 
     .mapAsync(3)(_.commitScaladsl()) 
     .runWith(Sink.ignore) 

答えて

0

があります。参考までにthis answerおよびdocsを参照してください。

あなたは

val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
     .mapAsync(1) { msg => 
     Future { 
      /** 
      * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down 
      */ 
     }.map(_ => msg.committableOffset) 

あなたのソースを抽出でき、その後、あなたはおそらくマップするために

src 
    .recoverWithRetries(attempts = -1, {case e: MyDatabaseException => 
    logger.error(e) 
    src.delay(30.minutes, DelayOverflowStrategy.backpressure)}) 
    ... 

(しようと再試行= -1無限に再試行を意味する)

+0

お返事ありがとうございます。 :)私はそれを試してみましょう。 –

1

注意が必要になりますかsrcの実体化された値はakka.kafka.scaladsl.Consumer.Controlからakka.NotUsedまでであり、recoverWithRetries

val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1")) 
    .mapAsync(1) { msg => 
    Future { 
     /** 
     * Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down 
     */ 
    }.map(_ => msg.committableOffset) 
    .mapMaterializedValue(_ => akka.NotUsed) 
関連する問題