2016-09-01 15 views
0

私は、カフカを実行しているスパークストリーミングジョブを実行しています。スパークストリーミングGroupBy処理するタプルの部分

val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => { 
    (mmd.topic, mmd.partition, mmd.offset, mmd.message) 
}) 

今、私は私のデータを持って来るように私は1つのバッチで処理することができ、同じトピック/パーティションですべてそうな話題やパーティションによってグループ化する:私はこのような中、メッセージを取得します。ここに使用する正しい機能は何ですか

messageStream.foreachRDD(x => x.? 

それはグループですか?それがグループであれば、私が持っているタプルの最初の2つの部分でどのようにグループ化するかです.KafkaRDD [0]には多くのメッセージが含まれますので、それぞれを処理できるようなメッセージのセットにグループ化したいチャンクと個々のメッセージのグループ化。

編集: だから下のフィードをもとに戻って私はこのようなものだろう:Kは(トピック、パーティション)であるように

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => { 
     ? 
    })) 

が、それはK、Vに今ある、値は(オフセット、トピック)? タプルの1番目と2番目の部分が必要です。なぜなら、APIコールを使ってメッセージの処理方法を知ることができるからです。私がしたくないことは、多くのメッセージがトピック/パーティションに基づいて設定されているため、個々のメッセージでAPIを個別に呼び出すことです。

編集:

K:(トピック、パーティション)V:CompactBuffer((トピック、パーティション、オフセット、メッセージ)、())など

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => { 
      val topic = x._1_.1 
      val partition = x._1._2 
      x._2.forEach(x=> ... 
     })) 

答えて

1
が、それはようになりまし来ることを実現

をタプルの最初の2つの部分でグループ化すると、次のように試すことができます。

messageStream groupBy (x => (x._1, x._2)) 
+0

あなたの言ったことに基づいて自分の反応を編集しました。 – theMadKing

+0

値は依然として4タプルです。 3番目と4番目のアイテムだけが必要な場合は、 'mapValues(v =>(v._3、v._4))'を実行する必要があります。 – ryan

+0

タプルの1番目と2番目の部分が必要です。なぜなら、APIコールを使ってメッセージの処理方法を知ることができるからです。私は自分のオフセットマネージャーを持っているので、3部目が必要です。私がしたくないことは、多くのメッセージがトピック/パーティションに基づいて設定されているため、個々のメッセージでAPIを個別に呼び出すことです。 – theMadKing

関連する問題