夜間にジョブを実行すると、カフカキュー内のすべてのメッセージが取得され、プロセスが実行されます。私はメッセージを受け取ることができますが、カフカストリームはより多くのメッセージを待っていて、処理を続行できません。私は、次のコードを持っている:キュー内のすべてのカフカメッセージを取得し、Javaでストリーミングを停止する
...
private ConsumerConnector consumerConnector;
private final static String TOPIC = "test";
public MessageStreamConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}
public List<String> getMessages() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
List<String> messages = new ArrayList<>();
while (it.hasNext())
messages.add(new String(it.next().message()));
return messages;
}
コードは、メッセージを取得することができますが、それはそれはラインにとどまるの最後のメッセージを処理する場合:私は、すべて取得することができますどのように
while (it.hasNext())
質問ですが、カフカからのメッセージは、ストリームを停止し、私の他のタスクを続行します。
私はあなたが私にカフカストリームは最初から消費するサポートしていないようだ
おかげ
可能性のある[Kafka Consumerの複製がjavaの.hasNextにぶら下がっている](0120)を参照してください。 – zaynetro
しかし、私は最高だとは思わないこれを行う習慣は、例外がスローされるまで待つことです。もし私のプロセスがタイムアウトが設定されている時間がもっとかかる場合はどうすればいいですか –
KafkaStreamではなく、まっすぐなKafka Consumerを使ってはいけませんか?ストリームは自然に生き続けるでしょう。 – NegatioN