あなたはPairDStreamFunctions.combineByKey
を使用することができます。
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.dstream.DStream
/**
* Created by Yuval.Itzchakov on 29/11/2016.
*/
object GroupingDStream {
def main(args: Array[String]): Unit = {
val pairs: DStream[(String, String)] = ???
val numberOfPartitions: Int = ???
val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
_ => List[String](),
(strings: List[String], s: String) => s +: strings,
(first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions))
groupedByIds.foreachRDD(rdd => {
rdd.foreach((kvp: (String, List[String])) => {
})
})
}
}
combineByKey
の結果が第1の要素は、キーと値のコレクションである第2の要素であるとのタプルになります。メモ型を指定していないので、例を簡単にするために(String, String)
を使用しました。
次に、foreach
を使用して値のリストを反復し、必要に応じて順次処理します。追加の変換を適用する必要がある場合は、foreachRDD
ではなく、DStream.map
を使用して2番目の要素(値のリスト)を操作できます。
こんにちは、私はpartionbykey(キー値のペアの非効率的なグループ化を避けるため)などの別の関数を使用することができます..私の上記のコードは、値が連続的に実行されることを保証できますエグゼクティブ)?グループ化する必要があります。すなわち、キー値のペアからの値によってアクセスされるか? – mahdi62
@ mahdi62なぜcombinedByKeyは非効率的だと思いますか?エグゼキュータ内のすべての同様のキーをローカルで組み合わせ、結合した結果のみをワイヤでシャッフルします。 –
このコードは、実際にはキーと値のペアを1つの項目で空リストにしています...コンバイナは(x:String)=> List [String](x)でなければなりません – mahdi62