2017-10-03 4 views
-2

私は2つのrddを持っていますが、両方とも次のように定義されています:rdd(String、Map(String、String))、それらをマージする必要があります。 RDDのような(文字列、リスト(MAP1、MAP2))私はマップでRddをマージする

+0

追加質問関連コードはこちら –

答えて

0

にこれを行うための効率的な方法を教えてくださいあなたは完全外部を実行することにより、RDDSをマージすることができ、次の例のように参加:

val rdd1 = sc.parallelize(Seq(
    ("a", Map("x1"->"u1", "y1"->"v1")), 
    ("b", Map("x2"->"u2", "y2"->"v2", "z2"->"w2")) 
)) 

val rdd2 = sc.parallelize(Seq(
    ("a", Map("m1"->"p1", "n1"->"q1")), 
    ("c", Map("m2"->"p2", "22"->"q2")) 
)) 

val rddJoined = rdd1.fullOuterJoin(rdd2).map{ 
    case (k, (u, v)) => 
    (k, Seq(u.getOrElse(Map[String,String]()), v.getOrElse(Map[String,String]()))) 
} 

rddJoined.collect 
res1: Array[(String, Seq[Option[scala.collection.immutable.Map[String,String]]])] = Array(
    (a, List(Map(x1 -> u1, y1 -> v1), Map(m1 -> p1, n1 -> q1))), 
    (b, List(Map(x2 -> u2, y2 -> v2, z2 -> w2), Map())), 
    (c, List(Map(), Map(m2 -> p2, 22 -> q2))) 
) 
+0

ありがとうございます。出来た :) –

関連する問題