私はScalaのプロデューサーを書いていますので、バッチ処理したいと思います。バッチ処理がうまくいく方法は、メッセージがいっぱいになるまでキュー内のメッセージを保持し、そのメッセージをすべてトピックにまとめて投稿することです。しかし、何とかそれは動作していません。メッセージの送信が始まると、メッセージが1つずつ投稿され始めます。誰もカフカプロデューサーでバッチ処理を使用する方法を知っていますか?あなたはデフォルトでは、あなたの小道具にカフカのバッチ処理がScalaのプロデューサで動作しません
をlinger.ms
を設定する必要があり
val kafkaStringSerializer = "org.apache.kafka.common.serialization.StringSerializer"
val batchSize: java.lang.Integer = 163840
val props = new Properties()
props.put("key.serializer", kafkaStringSerializer)
props.put("value.serializer", kafkaStringSerializer)
props.put("batch.size", batchSize);
props.put("bootstrap.servers", "localhost:9092")
val producer = new KafkaProducer[String,String](props)
val TOPIC="topic"
val inlineMessage = "adsdasdddddssssssssssss"
for(i<- 1 to 10){
val record: ProducerRecord[String, String] = new ProducerRecord(TOPIC, inlineMessage)
val futureResponse: Future[RecordMetadata] = producer.send(record)
futureResponse.isDone
println("Future Response ==========>" + futureResponse.get().serializedValueSize())
}
を参照して、バッチ処理をサポートしています。私はprops.put( "linger.ms"、5000)を持っています。しかし、まだ動作していません。今私のメッセージが5秒遅れて来るのが見えます。メッセージはまだ1つずつ来ますが、5秒の遅れがあります。 – user1733735
メッセージはバッチで格納され、バッチで取り出されますが、依然として個々のメッセージとしてコンシューマーに提示されます。 1つのメッセージを読んで、応答としてメッセージのバッチを取得することは期待しないでください。それはカフカバッチ処理の仕組みではありません。 –
したがって、バッチ処理が正しく行われたことを確認する必要がある場合。それをどのように確認できますか? – user1733735