2017-04-13 12 views
0

私はかなり単純なKafkaセットアップ - 1人のプロデューサー、1つのトピック、10のパーティション、同じグループIDを持つ10人のKafkaConsumersをすべて1台のマシンで実行しています。ファイルを処理すると、プロデューサは3269のメッセージを迅速に作成し、消費者はそれを楽しく使い始めることができます。しばらくの間、すべてうまく動作しますが、特定の時点で、消費者は重複を消費し始めます。重複がたくさんあります。実際には、メッセージキューをもう一度消費するように見えます。私はそれを長時間実行すると、データベースは同じデータエントリを6回以上受信し始めます。ロギングでいくつかのテストを行った後、消費者が同じユニークなメッセージ名で同じメッセージを再消費しているように見えます。Kafka 0.10.2消費者数が重複しています

私が知る限り、再調整は起こっていません。消費者は死ぬことも加えられない。それは同じ10人の消費者で、同じ3269メッセージを何度も繰り返し消費し、プロセスが終了するまで続きます。これをやめると、消費者は数十万件のレコードを書き、実際にデータベースに入るはずのデータ量を大幅に増やします。

私はカフカにはかなり新しいですが、なぜこのようなことが起こったのかは迷っています。私はカフカが正確に一度の処理を保証していないことを知っています、そして、私はここでそこにいくつかの重複で大丈夫です。同じレコードを再び永続化させないようにするコードがあります。しかし、消費者がキューを何度も何度も何度も再消費するのはなぜか分かりません。私はカフカのメッセージは消費された後で削除されないことを知っていますが、すべての消費者が同じグループに属していれば、オフセットはこれを防ぐべきでしょうか?私はオフセットの仕組みについて少しは理解していますが、わかっている限り、再調整がなければリセットされるべきではありませんか?そして、私の言う限り、メッセージはタイムアウトしていません。私の消費者が一度キュー内のすべてを消費してから、同じものを永久に再消費することなくもっと多くのメッセージを待つ方法がありますか?

は、ここで私はプロデューサーや消費者に渡すproprtiesです:私に

Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("group.id", "MyGroup"); 
     props.put("num.partitions", 10); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

     MyIngester ingester = new MyIngester(args[0], props); 

答えて

1

これは、受信を確認すると、問題のようです。 次のプロパティを試してください

props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "100"); 
関連する問題