私は、カフカを実行しているスパークストリーミングジョブを実行しています。スパークストリーミングGroupBy処理するタプルの部分
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})
今、私は私のデータを持って来るように私は1つのバッチで処理することができ、同じトピック/パーティションですべてそうな話題やパーティションによってグループ化する:私はこのような中、メッセージを取得します。ここに使用する正しい機能は何ですか
messageStream.foreachRDD(x => x.?
それはグループですか?それがグループであれば、私が持っているタプルの最初の2つの部分でどのようにグループ化するかです.KafkaRDD [0]には多くのメッセージが含まれますので、それぞれを処理できるようなメッセージのセットにグループ化したいチャンクと個々のメッセージのグループ化。
編集: だから下のフィードをもとに戻って私はこのようなものだろう:Kは(トピック、パーティション)であるように
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
?
}))
が、それはK、Vに今ある、値は(オフセット、トピック)? タプルの1番目と2番目の部分が必要です。なぜなら、APIコールを使ってメッセージの処理方法を知ることができるからです。私がしたくないことは、多くのメッセージがトピック/パーティションに基づいて設定されているため、個々のメッセージでAPIを個別に呼び出すことです。
編集:
K:(トピック、パーティション)V:CompactBuffer((トピック、パーティション、オフセット、メッセージ)、())など
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
val topic = x._1_.1
val partition = x._1._2
x._2.forEach(x=> ...
}))
あなたの言ったことに基づいて自分の反応を編集しました。 – theMadKing
値は依然として4タプルです。 3番目と4番目のアイテムだけが必要な場合は、 'mapValues(v =>(v._3、v._4))'を実行する必要があります。 – ryan
タプルの1番目と2番目の部分が必要です。なぜなら、APIコールを使ってメッセージの処理方法を知ることができるからです。私は自分のオフセットマネージャーを持っているので、3部目が必要です。私がしたくないことは、多くのメッセージがトピック/パーティションに基づいて設定されているため、個々のメッセージでAPIを個別に呼び出すことです。 – theMadKing