私は私のフレームワークでKafkaプロデューサー - コンシューマーモデルを使用しています。消費者側で消費されたレコードは、後にelasticsearchにインデックスされる。ここで私は、ESがダウンしている場合、ESがアップするまでカフカ消費者を一時停止する必要があるユースケースを持っています。それが終わったら、消費者を再開し、最後に残した場所からレコードを消費する必要があります。 これは@KafkaListenerでは実現できないと思います。誰も私にこれのための解決策を教えてもらえますか?私はこのために私自身のKafkaListenerContainerを書く必要があることを理解しましたが、正しく実装することはできません。どんな助けでも大歓迎です。カフカ消費者を一時停止する方法は?
答えて
可能な解決法がありますが、簡単な方法の1つはKafkaConsumer APIを使用することです。 KafkaConsumerでは、次回のpoll(...)呼び出しで取得されるトピックの位置を追跡します。あなたの問題は、カフカからレコードを取得した後で、Elastic Searchに挿入できないことがあります。この場合、コンシューマの位置をリセットするルーチンを作成する必要があります。この場合は、consumer.seek(partition、consumer.position(partition)-1)になります。これにより、位置が以前の位置にリセットされます。この時点で、良い方法はパーティションを一時停止することです(これにより、サーバーはリソースのクリーンアップを行うことができます)。 ESが利用可能になると、コンシューマーの再開を呼び出し、通常のポールインサートサイクルを続行します。考察
後に編集
は、指定されたライフサイクル・メソッドとスプリングBeanを作成します。 Beanの初期化メソッドで、KafkaConsumerをインスタンス化します(任意のソースからコンシューマの設定を取得します)。メソッドからコンシューマおよび更新ESと対話するスレッドを開始すると、残りのデザインは上記のとおりです。これは単発モデルです。スループットを向上させるには、Kafkaから取得したデータをメモリキュー内に小さくし、ディスパッチャスレッドでメッセージを取得し、ESを更新するためのプールスレッドに渡すようにしてください。
消費者をむしろ一時停止することをお勧めします。同じメッセージを何度もやり直してから、メッセージが正常に消費されたら、オフセットをコミットできません。例については
:
@Retryable
であなたの方法を注釈とのtry/catchであなたの方法をブロックし、catchブロックで新しい例外をスローします。 ListenerFactory設定について
プロパティを追加します。
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
- 1. 高レベルのカフカ消費者を一時停止する
- 2. カフカ消費者シェルスクリプト
- 3. カフカ - 消費者。 commitAsync
- 4. カフカ消費者ポーリングタイムアウト
- 5. 雲のカフカ消費者
- 6. カフカ消費者行動
- 7. カフカ消費者 - JMXプロパティ
- 8. カフカ10.2新しい消費者対古い消費者
- 9. なぜカフカの消費者は、消費に時間がかかりますか?
- 10. カフカの消費者状態を確認する方法
- 11. カフカ0.9:カフカはカフカの消費者の私の読書から
- 12. 春の統合カフカは:消費者
- 13. ウサギの消費者は、channel.basic.ackの後に消費を停止します
- 14. カフカ - 異なる速度の消費者
- 15. カフカ消費者がデータを消費しない
- 16. Reactive-Kafka:例外的に消費者を一時停止し、要求に応じて再試行する方法
- 17. 別の消費者が放棄した時点からカフカ消費者を開始する
- 18. カフカの消費者のエラーノードの障害
- 19. カフカの消費者からスパークストリーミング
- 20. 複数話題のカフカ消費者
- 21. カフカ消費者団体の問題
- 22. カフカ消費者オフセット最大値?
- 23. カフカ消費者スレッド安全性
- 24. RabbitMQ消費者プロセスの現在のメッセージを停止する
- 25. カフカ消費者が最も近いブローカーを選択する方法は?
- 26. カフカ消費者はサーバー内のレコードを消費していません
- 27. カフカの消費者オフセットを決定する要因は?
- 28. カフカ消費者を特定のオフセットで止める方法はありますか?
- 29. カフカ一度の消費保証
- 30. @RabbitListenerでメッセージの消費を停止する方法
私は、それを行う方法についての理論を理解して私の問題は、インプリメンテーションである、あなたの答えに非常に近いものと考えていました。同じコードスニペットを投稿できますか? –
あなたはおそらくアノテーションでこれを行うことはできません。もしあなたがKafkaConsumerを使ってうまくいけば、実装は簡単です。リストされたすべてのメソッドが利用できます。実装のどこに問題があるか – Ironluca
My Consumerは@KafkaListenerで注釈を付けられたPOJOリスナーです。このコンシューマのコンフィグレーション用のConsumerConfigクラスがあります。次のように私のリスナーの方法であって @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID、containerFactory = "kafkaListenerContainerFactory") 公共ボイドプロセス(ConsumerRecord , ?>レコード){ logger.info( "レコード:" +レコード)。 } 私はシークや他の方法を適用するための「消費者」オブジェクトを取得する場所を取得していません。 –