2017-08-09 8 views
2

私はScalaのプロデューサーを書いていますので、バッチ処理したいと思います。バッチ処理がうまくいく方法は、メッセージがいっぱいになるまでキュー内のメッセージを保持し、そのメッセージをすべてトピックにまとめて投稿することです。しかし、何とかそれは動作していません。メッセージの送信が始まると、メッセージが1つずつ投稿され始めます。誰もカフカプロデューサーでバッチ処理を使用する方法を知っていますか?あなたはデフォルトでは、あなたの小道具にカフカのバッチ処理がScalaのプロデューサで動作しません

linger.msを設定する必要があり

val kafkaStringSerializer = "org.apache.kafka.common.serialization.StringSerializer" 
     val batchSize: java.lang.Integer = 163840 
     val props = new Properties() 
     props.put("key.serializer", kafkaStringSerializer) 
     props.put("value.serializer", kafkaStringSerializer) 
     props.put("batch.size", batchSize); 
     props.put("bootstrap.servers", "localhost:9092") 

     val producer = new KafkaProducer[String,String](props) 

     val TOPIC="topic" 
     val inlineMessage = "adsdasdddddssssssssssss" 

     for(i<- 1 to 10){ 
     val record: ProducerRecord[String, String] = new ProducerRecord(TOPIC, inlineMessage) 
     val futureResponse: Future[RecordMetadata] = producer.send(record) 
     futureResponse.isDone 
     println("Future Response ==========>" + futureResponse.get().serializedValueSize()) 
     } 

答えて

1

、そのメッセージは可能であればすぐに送っていることを意味し、ゼロになります。 バッチが発生するように増やすことができます(たとえば100回)。これはレイテンシは長くなりますが、スループットが向上することを意味します。

batch.sizeです。linger.msが経過する前に届くと、さらに時間を待つことなくデータが送信されます。

実際に送信されたバッチを表示するには、ロギングを構成する必要があります(バッチ処理はバックグラウンドスレッドで行われ、プロデューサAPIで実行されたバッチを見ることはできません。バッチを送受信できません、のみレコードを送信し、その応答を受信し、バッチを介したブローカーとの通信が内部的に行われている)

まず、まだ行っていない場合は、log4jのプロパティは、例えば(Dlog4j.configuration=file:path/to/log4j.properties

log4j.rootLogger=WARN, stderr 
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=TRACE, stderr 

log4j.appender.stderr=org.apache.log4j.ConsoleAppender 
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout 
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n 
log4j.appender.stderr.Target=System.err 

ファイル結合し、私は意志受け取る

TRACE Sent produce request to 2: (type=ProduceRequest, magic=1, acks=1, timeout=30000, partitionRecords=({test-1=[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, crc=2237306008, CreateTime=1502444105996, key=0 bytes, value=2 bytes))), (record=LegacyRecordBatch(offset=1, Record(magic=1, attributes=0, compression=NONE, crc=3259548815, CreateTime=1502444106029, key=0 bytes, value=2 bytes)))]}), transactionalId='' (org.apache.kafka.clients.producer.internals.Sender) 

2つのデータのバッチです。バッチには同じブローカーに送信されるレコードが含まれます

次に、batch.sizeとlinger.msを使用して違いを確認してください。レコードにはオーバーヘッドが含まれているので、1000のbatch.sizeには10個のメッセージが含まれていないことに注意してください。

log4j.logger.orgのようなすべてのロガーとその動作を記述した文書は見つかりませんでした。 apache.kafka.clients.producer.internals.Sender)。あなたはrootLoggerでDEBUG/TRACEを有効にして、必要なデータを見つけることができます。explore the code

+0

を参照して、バッチ処理をサポートしています。私はprops.put( "linger.ms"、5000)を持っています。しかし、まだ動作していません。今私のメッセージが5秒遅れて来るのが見えます。メッセージはまだ1つずつ来ますが、5秒の遅れがあります。 – user1733735

+3

メッセージはバッチで格納され、バッチで取り出されますが、依然として個々のメッセージとしてコンシューマーに提示されます。 1つのメッセージを読んで、応答としてメッセージのバッチを取得することは期待しないでください。それはカフカバッチ処理の仕組みではありません。 –

+0

したがって、バッチ処理が正しく行われたことを確認する必要がある場合。それをどのように確認できますか? – user1733735

0

あなたはKafkaサーバーに同期してデータを生成しています。つまり、producer.sendfutureResponse.getと呼ぶと、データがKafkaサーバーに格納された後にのみ返されます。

応答を別のリストに格納し、forループ外のfutureResponse.getを呼び出します。デフォルトconfiguration

、カフカは私がやったlinger.msbatch.size

List<Future<RecordMetadata>> responses = new ArrayList<>(); 
for (int i=1; i<=10; i++) { 
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, inlineMessage); 
    Future<RecordMetadata> response = producer.send(record); 
    responses.add(response); 
} 

for (Future<RecordMetadata> response : responses) { 
    response.get(); // verify whether the message is sent to the broker. 
} 
関連する問題