私はカフカコンシューマーをいくつかのトピックから読んでいます。処理には多くの時間がかかりますが、(優先度の低い)トピックには常に多くのメッセージがありますが、できるだけ早く処理する必要があります。カフカコンシューマー - より高い優先度のトピック
これはDoes Kafka support priority for topic or message?と似た質問ですが、これは古いAPIを使用しています。新しいAPI(0.10.1.1)で
は、メソッド
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
があります。しかし、それが効果的にそこに新しいメッセージが優先度の高いトピックであり、消費を一時停止する必要があることを検出する方法を、私にははっきりしていません他のトピックから。
アイデア/例はありますか?
監視しているパーティションのendOffsetsが、そのパーティションの最後にコミットされたオフセットより大きいかどうかを確認できます。これがどのように動作するかは具体的に実装されますが、ポーリングする前にもっと多くのメッセージがあるかどうかを知ることができます – dawsaw