2017-11-14 10 views
0

私はカフカの例をテストしたいと思います。私は プロデューサーカフカ0.10.0.1を使用しています:KafkaConsumerがオフセットから読み取らない0

object ProducerApp extends App { 

val topic = "topicTest" 
val props = new Properties() 
props.put("bootstrap.servers", "localhost:9092") 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
val producer = new KafkaProducer[String, String](props) 
for(i <- 0 to 20) 
{  
val record = new ProducerRecord(topic, "key "+i," value "+i)  
producer.send(record)  
Thread.sleep(100)  
} 
} 

消費者(トピックを「topicTestは、」1つのパーティションで作成されます):

object ConsumerApp extends App { 
val topic = "topicTest" 
val properties = new Properties 
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") 
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer") 
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") 
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
val consumer = new KafkaConsumer[String, String](properties) 
consumer.subscribe(scala.List(topic).asJava)  
while (true) { 
consumer.seekToBeginning(consumer.assignment()) 
val records:ConsumerRecords[String,String] = consumer.poll(20000) 
println("records size "+records.count()) 
records.asScala.foreach(rec => println("offset "+rec.offset()))  
} 
} 

問題は、消費者からの読み取りないということです最初の反復ではオフセット0になりますが、他の反復ではオフセット0になります。私はその理由を知りたいと思っています。消費者にすべての反復でオフセット0を読み取らせるにはどうしたらいいですか?

records size 6 
offset 0 
offset 1 
offset 2 
offset 3 
offset 4 
offset 5 
records size 6 
offset 0 
offset 1 
offset 2 
offset 3 
offset 4 
offset 5 
... 

が、得られた結果は次のとおりです: 期待される結果がある私は、正確な間違いであるかを把握することができません

records size 4 
offset 2 
offset 3 
offset 4 
offset 5 
records size 6 
offset 0 
offset 1 
offset 2 
offset 3 
offset 4 
offset 5 
... 

答えて

0

、私はあなたと同じコードを書かれています。私にとってはうまくいっています。あなたがしたい場合は、以下のスニペットを使用することができます。

import java.util 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 
import org.apache.kafka.clients.consumer.KafkaConsumer 
import org.apache.kafka.common.TopicPartition 
import org.apache.kafka.common.serialization.LongDeserializer; 
import scala.collection.JavaConverters._ 
import java.util.Properties 

object ConsumerExample extends App { 

    val TOPIC = "test-stack" 

    val props = new Properties() 

    props.put("bootstrap.servers", "localhost:9092") 


    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("group.id", "testinf") 
    props.put("auto.offset.reset", "earliest") 
    props.put("auto.offset.reset.config", "false") 

    var listener = new ConsumerRebalanceListener() { 
    override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { 
     println("Assignment : " + partitions) 

    } 
    override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { 
     // do nothing 
    } 

    } 

    val consumer = new KafkaConsumer[String, String](props) 

    consumer.subscribe(util.Collections.singletonList(TOPIC), listener) 

    while (true) { 


    consumer.seekToBeginning(consumer.assignment()) 
    val records = consumer.poll(20000) 
    // for (record <- records.asScala) { 
    // println(record) 
    // } 
    println("records size "+records.count()) 
    records.asScala.foreach(rec => println("offset "+rec.offset())) 
    } 

} 

試してみてください。何か問題がある場合。

+0

ありがとうございますが、コンシューマーがすべてのレコードをすべての反復で(オフセット0からオフセット5まで)読み込むようにしたいと思います。問題は、最初の反復でオフセット0から読み取られないということです。 – DaliMidou

+0

最初の繰り返しでは、常にオフセット2からランダムに読み取りますか? – shakeel

+0

それはオフセット2からランダムに読み取ります – DaliMidou

関連する問題