2017-07-17 23 views
0

私はカフカコンシューマーをいくつかのトピックから読んでいます。処理には多くの時間がかかりますが、(優先度の低い)トピックには常に多くのメッセージがありますが、できるだけ早く処理する必要があります。カフカコンシューマー - より高い優先度のトピック

これはDoes Kafka support priority for topic or message?と似た質問ですが、これは古いAPIを使用しています。新しいAPI(0.10.1.1)で

は、メソッド

KafkaConsumer::pause(Collection) 
KafkaConsumer::resume(Collection) 

があります。しかし、それが効果的にそこに新しいメッセージが優先度の高いトピックであり、消費を一時停止する必要があることを検出する方法を、私にははっきりしていません他のトピックから。

アイデア/例はありますか?

+1

監視しているパーティションのendOffsetsが、そのパーティションの最後にコミットされたオフセットより大きいかどうかを確認できます。これがどのように動作するかは具体的に実装されますが、ポーリングする前にもっと多くのメッセージがあるかどうかを知ることができます – dawsaw

答えて

1

は最後に、私はdawsawよう勧め、という解決 - 処理ループでは、私は私からの読み取りすべてのトピック/パーティションについて格納します - 私は位置を使用することはできません

  • beginningOffsets
  • コミットendOffsets
  • なぜなら、私はトピックではなくパーティションに加入しているからです。任意の優先順位トピックの

    たび(endOffset - commited) > 0

は、私は、非優先トピックを consumer.pause()を呼び出すと、すべての優先順位のトピックの (endOffset - commited) == 0後に再びそれらを再開する。

+0

問題を解決する戦略を教えてください。優先度の低いメッセージと優先度の高いメッセージが(全体で10 Gbs)あるとします。私たちは複数の消費者と複数の生産者を持っています。私たちが消費者を一時停止させても、あなたの考えを実現するために、他のすべてのトピックのプロデューサーを一時停止する必要があります。右? 100のサービスと10のトピックのエコシステムではほとんど不可能と思われるので、これについての経験はありますか? - はい、私はあなたの関連する他の質問をこの問題について読みました。ありがとう – JSBach

+0

いいえ - プロデューサーを一時停止する必要はありません - アイデアは、単一の消費者がいくつかのトピック(これらのトピックのうちのいくつかは優先度が高く、その他は通常の優先度)を購読しているということです。新しいメッセージをポーリングする前に、優先度の高いトピックについて遅延をチェックする必要があります。これらのラグのいずれかがゼロでない場合は、消費者の「スチール」時間ではなく、通常の優先度のトピックについてサブスクリプションを一時停止する必要があることを意味します。ハイプライオリティのトピックからすべてのメッセージを処理した後、再びノーマル優先度のトピックを再開できます。 – miran

+0

ありがとうございます。私はちょうど無視することはできません。しかし、それは大きなシステムでは悪い匂いがする。ダムドアが膨大な量のデータのために開かれたら、私は今チェックしなければならないし、私はこの低優先順位キューでリソースを無駄にしている。どして私がこんな事に?右。とにかく。もう一度ありがとう – JSBach

0

あなたはposition()メソッドとcommitted()メソッドを混在させることができると思います。 position()メソッドはフェッチされる次のレコードのオフセットを取得し、commit()メソッドは指定されたパーティションの最後にコミットされたオフセットを取得します(ドキュメントの説明を参照)。 より低い優先度をポーリングする前に、より高い優先度のためにposition()とcommitted()をチェックすることができます。 position()がcommitted()よりも高い場合は、より高い優先度()でより低い優先度とpoll()を一時停止し、より低い優先度を再開することができます。

関連する問題