2017-06-27 7 views
1

私はkafka-topics.bat --zookeeper localhost:2181 --alter --topic test --config cleanup.policy=compact delete config min.cleanable.dirty.ratio=0.01 --config segment.ms=100 --config delete.retention.ms=100を使用してトピックをコンパクトにしました。 私は同じキーを持つ2000のメッセージを送っています。私がそれらのメッセージを消費するとき、私は1つの圧縮されたメッセージではなく、それぞれのメッセージを個別に取得します。トピックのためのカフカ締固め

答えて

0

あなたが参照している圧縮設定は、Kafkaクライアントとメッセージをどのように消費するかには関係ありません。詳細はofficial documentation hereをご確認ください。

クライアントがメッセージをどのように消費するかを制御するには、client config propertiesを使用してクライアントを設定する必要があります。

トピックを300ミリ秒間プールし、各メッセージを個別に処理するために反復処理できる一連のメッセージ(ConsumerRecords)を受信する状況を考えてみましょう。

while(true) { 
    ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(300); 
     if(records.count() > 0) { 
      for(ConsumerRecord<String, JsonNode> record: records) { 
      if(counter % 500 == 0) { 
       log.info("Record recovered, groupId: {}, topicName: {}, key: {}, value: {} , offset: {}", 
       this.groupId, this.topicNames, record.key(), record.value(), record.offset()); 
        } 
       } 
      } 
     } 
+0

私たちはこのkey = value1 key = value2のようなものを持っているとしましょう。ここでvalue2はkeyの更新された値です。更新された値だけを保持するにはどうすればよいですか? – codehacker

+1

こんにちは、kafkaの仕事ではなく、最新の情報だけを保持することはできません。kafka 0.10.1で導入されたkafkaストリームとKtableまたはGlobalKTableを使用する必要があると言われています。または0.10.0まで私は今までには覚えていない.... https://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple/ https:// kafka .apache.org/documentation/streams#streams_kstream_ktable –

+0

@codehacker - 上記のコメントを参照してください。あなたにタグを付けるのを忘れました。 –

関連する問題