0
私はカフカの例をテストしたいと思います。私は プロデューサーカフカ0.10.0.1を使用しています:KafkaConsumerがオフセットから読み取らない0
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{
val record = new ProducerRecord(topic, "key "+i," value "+i)
producer.send(record)
Thread.sleep(100)
}
}
消費者(トピックを「topicTestは、」1つのパーティションで作成されます):
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records:ConsumerRecords[String,String] = consumer.poll(20000)
println("records size "+records.count())
records.asScala.foreach(rec => println("offset "+rec.offset()))
}
}
問題は、消費者からの読み取りないということです最初の反復ではオフセット0になりますが、他の反復ではオフセット0になります。私はその理由を知りたいと思っています。消費者にすべての反復でオフセット0を読み取らせるにはどうしたらいいですか?
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
...
が、得られた結果は次のとおりです: 期待される結果がある私は、正確な間違いであるかを把握することができません
records size 4
offset 2
offset 3
offset 4
offset 5
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
...
ありがとうございますが、コンシューマーがすべてのレコードをすべての反復で(オフセット0からオフセット5まで)読み込むようにしたいと思います。問題は、最初の反復でオフセット0から読み取られないということです。 – DaliMidou
最初の繰り返しでは、常にオフセット2からランダムに読み取りますか? – shakeel
それはオフセット2からランダムに読み取ります – DaliMidou