2016-08-20 7 views
-5

私のデータは、どちらかがスパークの場合、削減する最速のデータ構造は何ですか?地図やタプルのリスト?

RDD[Map[String, Map[String, Int]]] 

か、第二の例では、データの「マッピング」を見ることができるように

RDD[List[(String, List[(String,Int)])]] 

として最初によって強制され、二つの形式のRDDSに存在する可能性がタプルの要素がキーです。私のrddの2つのエントリを考えてみましょう。それらをR1とR2と呼んでください。私はR1とR2のキーでマージします。 R1とR2の両方に同じキーが含まれている場合、これらの値をさらにマージします。例として、その結果のマージがそう

outer_key1 -> (inner_key1 -> 2) 

が生成されます、私の質問は、構造がより速く、より多くのメモリ効率的に火花のためにあるデータであり、R1とR2の両方がエントリが含まれていることを

outer_key1 -> (inner_key1 -> 1) 

を言います外側キーと内側キーで減らしますか?マップのマップまたは(key、list_of_tuple)のリスト私の直感は、0(1)ルックアップを考えると、地図をキーで減らす方が速いということです。しかし、ほとんどのマップが実装されていることを考えると、マップベースのRDDSにはかなりの無駄なメモリがあると確信しています。

マージのこのタイプの実際の生活の例として、私のRDDSは、私はあなたがRDDのの概念について誤理解したと思う

Map(email_address->(date->number_of_emails_recieved_that_day)) where each RDD contains many email addresses 
+0

は、あなたがあなたの内側のデータに対して 'findByKey'アクセスまたは' sequentialAccess'を持っているとのWANかどうかに依存します。 –

+0

RDDは 'RDD [T]'で、 'T'は' Map [String、Map [String、Int]] 'です。それは 'RDD [(K、T)]ではないので、' Key'の意味は何ですか? –

答えて

0

を表しています。 RDDの力を活用するには、データを適切な構造に変換する必要があります。

だから、あなたはあなたのRDDを決定するために計算したいものについて考える必要があります。

あなたの質問を理解した通りです。 2つのデータソースがあり、これらの2つのデータソースから取得したデータをマージする必要があります。したがって、これらのソースから2つのRDDを作成し、それらをマージします。

// First we will have to create RDD's from our data source.  

// create RDD from source 1 
// Lets say you have a List[(String, List[(String, Int)]] 
val src1 = List(
    ("[email protected]", List(("01/01/2016", 10), ("05/01/2016", 20))) 
    ("[email protected]", List(("01/01/2016", 5), ("06/01/2016", 30)) 
) 

// Now enters spark 
val rddSrc1: RDD[(String, List[String, Int])] = sc.parallelize(src1) 


// create RDD from source 2 
// Lets say you have a Map[(String, Map[String, Int]] 
val src2 = Map(
    "[email protected]" -> Map("01/01/2016" -> 10, "05/01/2016" -> 20) 
    "[email protected]" -> Map("01/01/2016" -> 5, "06/01/2016" -> 30) 
) 

// Now enters spark 
val rddSrc1: RDD[(String, Map[String, Int])] = sc.parallelize(src2.toList) 


// Now since you want to merge on both "email" and "date" lets make ("email", "date") tuple as key. 

rddSrc1T: RDD[(String, String), Int] = rddSrc1 
    .flatMap({ 
    case (email, list) => list.map({ 
     case (date, number) => ((email, date), number) 
    }) 
    }) 

rddSrc2T: RDD[(String, String), Int] = rddSrc1 
    .flatMap({ 
    case (email, map) => map.toList.map({ 
     case (date, number) => ((email, date), number) 
    }) 
    }) 

// now co-group the 2 RDD's 
rddCogroup: RDD[((String, String), Iterable[Int], Iterable[Int])) = rddSrc1T.cogroup(rddSrc2T) 

val totalNumberRdd: RDD[((String, String), Int] = rddCogroup.map({ 
    case ((email, date), iter1, iter2) => ((email, date), iter1.sum + iter2.sum) 
}) 
+0

(ここには何も慣れていませんが、変数名が正しくないと表示されます。たとえば、src1/src2はリスト/マップと並行して呼び出されます) – Will

+0

何ですか?あなたの声明は明確ではありません。 –

+0

うん...それは入力ミスです。 –

関連する問題