2017-10-10 11 views
0

小さな遅延で特定のトピックのカフカでコンシューマーを始めたい。具体的には、メッセージを作成してから特定の時間が経過した後、消費者がトピックからメッセージを消費し始めるようにします。誰もがそれを可能にするためにカフカに任意のプロパティまたはオプションがあると言うことができます。前もって感謝します。Apache kafka - コンシューマー遅延オプション

+0

とすぐにプロデューサーが、カフカのログにそれが行くKafka' 'でメッセージを送信し、消費者がそれを消費するために利用可能になりました。私は、すべてのメッセージに関して消費を遅らせるオプションを提供するカフカドキュメンテーションのプロパティは表示されません(もしあれば、それについてもっと知っていればうれしいです)。しかし、あなたはプロデューサーに比べて少し遅れてコンシューマーを始めることができるように、メッセージの消費を管理できます。 – Explorer

答えて

0

スパークストリーミングについても同じことをしました。私は、アプローチがあなたにも合うことを願っています。

アイデアは非常に簡単です - Thread.sleepを使用してください。 kafkaから新しいメッセージを受け取ったら、処理する前にどれくらいの時間寝る必要があるかを計算できます。アイデアのための

擬似コード:

message = getNextMessageFromKafka() 
sleepMs = Math.max(0, currentTime - message.timestamp) 
Thread.sleep(speepMs) 
do processing 
+0

アイデアは完全に1ms間隔ごとにトピックがいくつかのデータで充実し、消費者が消費し、スレッドが1msデータごとにスリープして、これが正しく進行し、他の処理ロジックがryteと呼ばれる!!!! –

関連する問題