2017-07-09 2 views
0

特定のメッセージセットに対してKafkaプロデューサによって作成されたバッチの数を特定する方法はありますか?たとえば、ループで10Kのメッセージを送信している場合、送信されたバッチの数を確認する方法はありますか?私は "batch.size"を高い値に設定しました。私の期待は、メッセージがバッファリングされ、消費者のメッセージを見るのが遅くなるということでした。しかし、これは私の消費者プログラムでほぼ即座に印刷されるようです。Kafka - 生産者バッチ数

batch.sizeが16384の場合のデフォルト値です。このバイト数ですか?

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 

import java.util.Date; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Properties; 


public class KafkaProducerApp { 

    public static void main(String[] args){ 
     Properties properties = new Properties(); 
     properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094"); 
     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
     properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
     properties.put("acks","0"); 
     properties.put("batch.size",33554432); 

     KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties); 
     Map<Integer,Integer> partitionCount = new HashMap<Integer,Integer>(); 
     partitionCount.put(0,0); 
     partitionCount.put(1,0); 
     partitionCount.put(2,0); 


     try{ 
      Date from = new Date(); 
      for(int i=0;i<10000;i++) { 
       RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("test_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get(); 
       //RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String,String>("test_topic",0,Integer.toString(i), "MyMessage" + Integer.toString(i))).get(); 
       System.out.println(" Offset = " + ack.offset()); 
       System.out.println(" Partition = " + ack.partition()); 
       partitionCount.put(ack.partition(),partitionCount.get(ack.partition())+1); 

      } 
      Date to = new Date(); 
      System.out.println(" partition 0 =" + partitionCount.get(0)); 
      System.out.println(" partition 1 =" + partitionCount.get(1)); 
      System.out.println(" partition 2 =" + partitionCount.get(2)); 
      System.out.println(" Elapsed Time = " + (to.getTime()-from.getTime())/1000); 

     } catch (Exception ex){ 
      ex.printStackTrace(); 
     } finally { 
      kafkaProducer.close(); 
     } 



    } 

} 

答えて

1

あなたが求めているのは、生産要求の総数です。

あなたは二JMX Mbeanののkafka.producerを使用してあたりの生産要求の平均数を確認できます。type =プロデューサー・メトリクス、クライアントID =([ - ワット] +)

+0

おかげハンスを!私は "batch.size"を高い値に設定しました。私の期待は、メッセージがバッファリングされ、消費者のメッセージを見るのが遅くなるということでした。しかし、これは私の消費者プログラムでほぼ即座に印刷されるようです。遅れはないでしょうか?実際にバッファリングされているかどうかはどのように判断できますか? –

+0

カフカのすべてがバッファリングされています。生産者と消費者の両方。サイズと時間ベースの両方のしきい値がありますが、バッファ全体がいっぱいになったときにのみバッチを送信するとは思われません。ここでlinger.msとbatch.sizeの説明をご覧くださいhttp://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html –

+0

ありがとうございました! –

関連する問題