2017-11-07 5 views
0

KafkaConsumerのすべてのコンシューマーレコードがフェッチされるまで続けることが重要なユースケースがあります。このユースケースでは、パイプラインに入ることはありません。絶対的に確実にフェッチすることは何もないことを保証するための適切な方法は何ですか?Kafka - 空になるまで消費する

+0

これは正しい方法ですか?あなたはバッチ1にストリーミングソリューションを作っているようですね? –

+0

ゲートウェイが完了し、メッセージがまだCassandraに届いていない場合は、最後に発行されたリビジョン番号を見つけるためにキュー内のメッセージを消費します。 –

+0

新しいデータが追加されていないことがわかっている場合、 'Consumer#endOffsets'でログの終わりを取得し、' Consumer#position'が終わりに達すると読み取りを終了できます。 –

答えて

1

Kafkaは無限のデータストリームを処理するように設計されているため、「すべて消費する」とは誰も何らかの期間(1分)、1時間などの間にデータを送信しないことを意味します。

あなたは(擬似コード)のようなものを使用することができます。

int emptyCount = 0; 
while (true) { 
    records = Consumer.poll(500); 
    if (records.empty()) { 
     emptyCount++; 
     if (emptyCount >= 100) { 
     break; 
     } 
     continue; 
    } 
    emptyCount = 0; 
    ...process records... 
} 

あなたは空のサイクルの世論調査&数のチューニングタイムアウトが必要な待機時間に到達することができます。

+0

これはうまくいくかもしれません。今私はseekToBeginning/seekToEndを見ていて、2つが同じで、ストリームが空であるとみなしているが、これは意味的に正しくないかもしれないと計算するためにpositionを使用しています。 –

+0

この処理と長いタイムアウトでポーリングを行うことの意味の違いは何ですか?このチェックが行われている間、カフカには何も入力されていません。 –

+0

タイムアウトがheartbeatより長い 'poll'を呼び出すと、消費者は死んだとみなされます。 –

関連する問題