0

カフカのすべてのパーティションにスパーク書き込みメッセージを作成して、直接ストリームを使用してストリーミングのパフォーマンスを向上させる方法。ここスパークストリーミングを使用してカフカのトピックにメッセージを書き込むときは、ただ1つのパーティションに書き込むだけです。

は私のコードです: -

object kafka { 
def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("FlightawareSparkApp") 
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    val ssc = new StreamingContext(sparkConf, Seconds(3)) 
    val lines = ssc.socketTextStream("localhost", 18436) 
         val topic = "test" 
         val props = new java.util.Properties() 
         props.put("metadata.broker.list", "list") 
         props.put("bootstrap.servers", "list") 
         // props.put("bootstrap.servers", "localhost:9092") 
         // props.put("bootstrap.servers", "localhost:9092") 
         props.put("client.id", "KafkaProducer") 
         props.put("producer.type", "async") 
         props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer") 
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
      lines.foreachRDD(rdd => { 
       rdd.foreachPartition(part => { 
        val producer = new KafkaProducer[Integer, String](props) 
        part.foreach(msg =>{ 
         val record = new ProducerRecord[Integer, String](topic, msg) 
         producer.send(record) 
        }) 
        producer.close() 
       }) 
      }) 
      ssc.start() 
      ssc.awaitTermination() 
} 

}

このコードはカフカのトピックにメッセージをプッシュするが、ときに私はどこ私ができる

/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKABROKERS --topic test --time -1 

を使用してカウントが出力を取得しています見ています1つのパーティション内のメッセージのみを参照してください。

test:8:0 
test:2:0 
test:5:0 
test:4:0 
test:7:0 
test:1:0 
test:9:0 
test:3:0 
test:6:237629 
test:0:0 

データをすべてのパーティションに分割する方法に関する提案。

パーティション間でメッセージを配信するために、プログラムでデフォルトでパーティションキーを実装する方法。

ありがとう、

アンクッシュレディ。

答えて

2

キーを設定していないためです。 Kafka FAQ [1]で、以下の詳細を見つけることができます。

パーティションキーが指定されていないと、データがパーティション間で均等に分散されないのはなぜですか?

Kafkaプロデューサでは、メッセージの宛先パーティションを示すためにパーティションキーを指定できます。デフォルトでは、ハッシュベースのパーティション化プログラムを使用して、そのキーが与えられたパーティションIDを特定し、カスタマイズされたパーティショニングも使用できます。

オープン・ソケットの数を減らすには、パーティション化キーが指定されていないかヌルの場合、0.8.0(https://issues.apache.org/jira/browse/KAFKA-1017)で、プロデューサはランダム・パーティションを選択してからしばらくの間(デフォルトは10分)別のものに切り替える。そのため、パーティションより少ないプロデューサが存在する場合、特定の時点で、一部のパーティションでデータが受信されないことがあります。この問題を軽減するには、メタデータの更新間隔を短縮するか、メッセージキーとカスタマイズされたランダムパーティショナーを指定します。詳細については、このスレッドhttp://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E

を参照してください[1] https://cwiki.apache.org/confluence/display/KAFKA/FAQ

+0

は返事をいただき、ありがとうございます。ここにランダムなキーを渡すと、それは動作します。 val record = new ProducerRecord [Integer、String](トピック、キー、8、msg)。 –

関連する問題