2016-04-14 3 views
0

でRDDSをマージすることをお勧め何私は、彼らが同じ形式のものであり、その結果として、マルチRDDSを持って、それらをマージしたいしている:ここでは、Scalaの

RDD(id, HashMap[String, HashMap[String, Int]]) 
    ^   ^ ^
    |    |  | 
    identity  category distribution of the category 

は、RDDの例である:

(1001, {age={10=3,15=5,16=8, ...}}) 

HashMap[String, HashMap]の最初のキーStringは、統計のカテゴリであり、HashMap[String, HashMap]HashMap[String, Int]はカテゴリの分布です。さまざまなカテゴリの各分布を計算した後、結果をデータベースに保存できるように、それらをアイデンティティでマージします。ここで私は現在、得たものである:

def mergeRDD(rdd1: RDD[(String, util.HashMap[String, Object])], 
       rdd2:RDD[(String, util.HashMap[String, Object])]): RDD[(String, util.HashMap[String, Object])] = { 

    val mergedRDD = rdd1.join(rdd2).map{ 
    case (id, (m1, m2)) => { 
     m1.putAll(m2) 
     (id, m1) 
    } 
    } 
    mergedRDD 
} 
val mergedRDD = mergeRDD(provinceRDD, mergeRDD(mergeRDD(levelRDD, genderRDD), actionTypeRDD)) 

私は2つのRDDSたびにマージすることができるように、私は機能mergeRDDを書き、しかし、私はその関数が任意の感動が高く評価され、スカラ座への初心者として、非常にエレガントではありませんがわかりました。

+0

マージ機能のプロパティは何ですか? – eliasah

+0

@eliasahお返事ありがとうございますが、プロパティではどういう意味ですか? – armnotstrong

答えて

2

パフォーマンスを上げることなく、これを達成するための簡単な方法はありません。理由は、単純に2つのrddをマージするのではなく、ハッシュマップにrddの結合後に値を連結したいからです。

ここで、マージ機能が間違っています。現在の状態では、joinは実際にはinner joinを行い、いずれかのrddに存在する行が他のものに存在しないようにします。

正しい方法は次のようなものになります。

val mergedRDD = rdd1.union(rdd2).reduceByKey{ 
    case (m1, m2) => { 
     m1.putAll(m2) 
     } 
} 
+0

それを指摘してくれてありがとう – armnotstrong

0

あなたはそこからscala.collection.immutable.Map

java.util.HashMapを置き換えることがあります。

val rdds  = List(provinceRDD, levelRDD, genderRDD, actionTypeRDD) 
val unionRDD = rdds.reduce(_ ++ _) 
val mergedRDD = unionRDD.reduceByKey(_ ++ _) 

これはカテゴリがRDDS間で重複していないと仮定しています。