私はある条件で一時停止したいと思っているカフカの消費者を持っていて、後でそれを再開して前のすべてのメッセージを消費します。 1つのアイデアは、他のスレッドによって更新され、消費する前に共有フラグを使用することです。すなわち、iterator.next().message()
フラグの値をチェックします。それが本当であれば、他のものを消費しないでください。ちょうど私が正しい方向に考えているのか、それとももっと良い方法があるのかをチェックしたかったのです。高レベルのカフカ消費者を一時停止する
class KafkaConsumer implements Runnable {
KafkaStream<byte[], byte[]> topicStream;
ConsumerConnector consumerConnectorObj;
public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
final ConsumerConnector consumerConnectorObj) {
this.topicStream = topicStream;
this.consumerConnectorObj = consumerConnectorObj;
}
@Override
public void run() {
if (topicStream != null) {
ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
while (true) {
if (iterator != null) {
boolean nextFlag = true;
try {
iterator.hasNext();
} catch (ConsumerTimeoutException e) {
LOG.warn("Consumer timeout occured", e);
nextFlag = false;
}
if (nextFlag) {
byte[] msg = iterator.next().message();
}
}
}
}
}
}
ありがとうHarald私はそれを試してみると戻ってきます –