は誰でも以下queries.Iから私を助けることができるがカフカ-クライアント-0.10.1.1(単一ノードシングルブローカー)auto.create.topics.enableのカフカクライアントAPI質問
デフォルト値を使用していますがtrueです。
1.Iは消費する
kafkaProdcuer<String,String> producer> producer...
producer.send(new ProducerRecord<String, String>("my- topic","message"));
producer.close();
を使用してトピックにメッセージを送信しています:
kafkaConsumer<String,String> consumer....
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(200);
while(true){
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
問題は、私が最初に消費者を実行するとき、それは値を取得していないです。そして私はプロデューサーを実行し、消費者を再び動かして値を得る必要があります。いくつかの時間私はプロデューサーを3回実行する必要があります。 これはなぜこのように機能しますか? enable.auto.commitプロパティがfalseの場合
2)enable.auto.commit = falseを
は、同じ消費者がメッセージを複数回を読むことはできますか?
3)第一point.Howで私の消費者のコードを考えると、私は
kafkaビンにはコンソールコンシューマーがあります。消費者がデータを消費できない間に試してみることができます。可能であれば、producer.flush()を追加してみてください。 3番目の質問については、ストリーミングプログラムがバッチの終わりを知る方法はありませんが、タイムアウトスレッドを設定して、データが消費されずにタイムアウトを監視することができます。 – Lhfcws
はいbinコンシューマでテストしましたが、相関ID 1のメタデータを取得するときにエラーが発生しました:{my-topic-106 = LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient) – jena84
データを作成しましたか最近あなたの消費データの前に?デフォルトでは、カフカはあなたのデータを3日間保管しておくだけです。 – Lhfcws