2016-12-08 5 views
0

(Key、Value)タイプのdstreamがあります。DStreamすべての同一キーを順次処理する必要があります

mapped2.foreachRDD(rdd => { 
    rdd.foreachPartition(p => { 
    p.foreach(x => { 
    } 
)}) 
}) 

私はこれを行うにはどのように

..同じキーを持つすべての項目を1つのパーティションにし、1 core..soによって処理され、実際に順次そこに処理されることを保証取得する必要がありますか?非効率なGroupBykeyを使用できますか?

答えて

1

あなたはPairDStreamFunctions.combineByKeyを使用することができます。

import org.apache.spark.HashPartitioner 
import org.apache.spark.streaming.dstream.DStream 
/** 
    * Created by Yuval.Itzchakov on 29/11/2016. 
    */ 
object GroupingDStream { 
    def main(args: Array[String]): Unit = { 
    val pairs: DStream[(String, String)] = ??? 
    val numberOfPartitions: Int = ??? 

    val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
     _ => List[String](), 
     (strings: List[String], s: String) => s +: strings, 
     (first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions)) 

    groupedByIds.foreachRDD(rdd => { 
     rdd.foreach((kvp: (String, List[String])) => { 

     }) 
    }) 
    } 
} 

combineByKeyの結果が第1の要素は、キーと値のコレクションである第2の要素であるとのタプルになります。メモ型を指定していないので、例を簡単にするために(String, String)を使用しました。

次に、foreachを使用して値のリストを反復し、必要に応じて順次処理します。追加の変換を適用する必要がある場合は、foreachRDDではなく、DStream.mapを使用して2番目の要素(値のリスト)を操作できます。

+0

こんにちは、私はpartionbykey(キー値のペアの非効率的なグループ化を避けるため)などの別の関数を使用することができます..私の上記のコードは、値が連続的に実行されることを保証できますエグゼクティブ)?グループ化する必要があります。すなわち、キー値のペアからの値によってアクセスされるか? – mahdi62

+0

@ mahdi62なぜcombinedByKeyは非効率的だと思いますか?エグゼキュータ内のすべての同様のキーをローカルで組み合わせ、結合した結果のみをワイヤでシャッフルします。 –

+0

このコードは、実際にはキーと値のペアを1つの項目で空リストにしています...コンバイナは(x:String)=> List [String](x)でなければなりません – mahdi62

関連する問題