2017-11-21 7 views
0

私のKafkaコンシューマは2つのスレッドを持ち、パーティションの数は10と言います。つまり、コンシューマスレッドごとに5つのパーティションがあります。特定のレコードを処理する必要がある時間を節約しています。今度はpartition1のrecord1を10時間後に選択する必要がある場合、スレッドは次のパーティションに移動して、次のパーティションを選択できるかどうかを確認します。Kafka Javaコンシューマ

例:

P1 - 8 
P2 - 7 
P3 - 6 
P4 - 5 
P5 - 4 

今パーティションP1上のデータは8時間後に取りに行く必要があり、現在の時間が、私は私のスレッドが8時間を待つために作る場合、私はものの1時間待って、6時間です私はP3、P4、P5を処理することができました。

どうすればいいのか教えてください。

答えて

0

私は簡単に行くだろう:パーティションごとにコンシューマーを持っている。実際に特定のパーティションを選択してコンシューマに割り当てるには、KafkaConsumer.assign() APIがあります。このようにして、それぞれは、上記のスケジュールに基づいて、独自の独立したロジックを処理します。

+0

こんにちはArtem、迅速な対応に感謝します。新しいJVMサーバーを導入することで可能な処理速度を向上させる必要がある場合など、いくつかのパーティションを追加する予定です。 – singhal

+0

さて、サブスクリプション機能では解決策はありません。ブローカがコンシューマのパーティションをスキャンする順序を保証するものではありません。私が購読しているだけでは、特定のパーティションに対処するために消費者を物理的に強制する方法はわかりません。 –

関連する問題