カフカのすべてのパーティションにスパーク書き込みメッセージを作成して、直接ストリームを使用してストリーミングのパフォーマンスを向上させる方法。ここスパークストリーミングを使用してカフカのトピックにメッセージを書き込むときは、ただ1つのパーティションに書き込むだけです。
は私のコードです: -
object kafka {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("FlightawareSparkApp")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 18436)
val topic = "test"
val props = new java.util.Properties()
props.put("metadata.broker.list", "list")
props.put("bootstrap.servers", "list")
// props.put("bootstrap.servers", "localhost:9092")
// props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("producer.type", "async")
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
lines.foreachRDD(rdd => {
rdd.foreachPartition(part => {
val producer = new KafkaProducer[Integer, String](props)
part.foreach(msg =>{
val record = new ProducerRecord[Integer, String](topic, msg)
producer.send(record)
})
producer.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
このコードはカフカのトピックにメッセージをプッシュするが、ときに私はどこ私ができる
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKABROKERS --topic test --time -1
を使用してカウントが出力を取得しています見ています1つのパーティション内のメッセージのみを参照してください。
test:8:0
test:2:0
test:5:0
test:4:0
test:7:0
test:1:0
test:9:0
test:3:0
test:6:237629
test:0:0
データをすべてのパーティションに分割する方法に関する提案。
パーティション間でメッセージを配信するために、プログラムでデフォルトでパーティションキーを実装する方法。
ありがとう、
アンクッシュレディ。
は返事をいただき、ありがとうございます。ここにランダムなキーを渡すと、それは動作します。 val record = new ProducerRecord [Integer、String](トピック、キー、8、msg)。 –