2017-09-27 15 views
0

私は私のフレームワークでKafkaプロデューサー - コンシューマーモデルを使用しています。消費者側で消費されたレコードは、後にelasticsearchにインデックスされる。ここで私は、ESがダウンしている場合、ESがアップするまでカフカ消費者を一時停止する必要があるユースケースを持っています。それが終わったら、消費者を再開し、最後に残した場所からレコードを消費する必要があります。 これは@KafkaListenerでは実現できないと思います。誰も私にこれのための解決策を教えてもらえますか?私はこのために私自身のKafkaListenerContainerを書く必要があることを理解しましたが、正しく実装することはできません。どんな助けでも大歓迎です。カフカ消費者を一時停止する方法は?

答えて

0

可能な解決法がありますが、簡単な方法の1つはKafkaConsumer APIを使用することです。 KafkaConsumerでは、次回のpoll(...)呼び出しで取得されるトピックの位置を追跡します。あなたの問題は、カフカからレコードを取得した後で、Elastic Searchに挿入できないことがあります。この場合、コンシューマの位置をリセットするルーチンを作成する必要があります。この場合は、consumer.seek(partition、consumer.position(partition)-1)になります。これにより、位置が以前の位置にリセットされます。この時点で、良い方法はパーティションを一時停止することです(これにより、サーバーはリソースのクリーンアップを行うことができます)。 ESが利用可能になると、コンシューマーの再開を呼び出し、通常のポールインサートサイクルを続行します。考察

後に編集

は、指定されたライフサイクル・メソッドとスプリングBeanを作成します。 Beanの初期化メソッドで、KafkaConsumerをインスタンス化します(任意のソースからコンシューマの設定を取得します)。メソッドからコンシューマおよび更新ESと対話するスレッドを開始すると、残りのデザインは上記のとおりです。これは単発モデルです。スループットを向上させるには、Kafkaから取得したデータをメモリキュー内に小さくし、ディスパッチャスレッドでメッセージを取得し、ESを更新するためのプールスレッドに渡すようにしてください。

+0

私は、それを行う方法についての理論を理解して私の問題は、インプリメンテーションである、あなたの答えに非常に近いものと考えていました。同じコードスニペットを投稿できますか? –

+0

あなたはおそらくアノテーションでこれを行うことはできません。もしあなたがKafkaConsumerを使ってうまくいけば、実装は簡単です。リストされたすべてのメソッドが利用できます。実装のどこに問題があるか – Ironluca

+0

My Consumerは@KafkaListenerで注釈を付けられたPOJOリスナーです。このコンシューマのコンフィグレーション用のConsumerConfigクラスがあります。次のように私のリスナーの方法であって @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID、containerFactory = "kafkaListenerContainerFactory") 公共ボイドプロセス(ConsumerRecord レコード){ logger.info( "レコード:" +レコード)。 } 私はシークや他の方法を適用するための「消費者」オブジェクトを取得する場所を取得していません。 –

0

消費者をむしろ一時停止することをお勧めします。同じメッセージを何度もやり直してから、メッセージが正常に消費されたら、オフセットをコミットできません。例については

@Retryable

であなたの方法を注釈とのtry/catchであなたの方法をブロックし、catchブロックで新しい例外をスローします。 ListenerFactory設定について

プロパティを追加します。

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); 
factory.getContainerProperties().setAckOnError(false); 
関連する問題