-2
私は2つのrddを持っていますが、両方とも次のように定義されています:rdd(String、Map(String、String))、それらをマージする必要があります。 RDDのような(文字列、リスト(MAP1、MAP2))私はマップでRddをマージする
私は2つのrddを持っていますが、両方とも次のように定義されています:rdd(String、Map(String、String))、それらをマージする必要があります。 RDDのような(文字列、リスト(MAP1、MAP2))私はマップでRddをマージする
にこれを行うための効率的な方法を教えてくださいあなたは完全外部を実行することにより、RDDSをマージすることができ、次の例のように参加:
val rdd1 = sc.parallelize(Seq(
("a", Map("x1"->"u1", "y1"->"v1")),
("b", Map("x2"->"u2", "y2"->"v2", "z2"->"w2"))
))
val rdd2 = sc.parallelize(Seq(
("a", Map("m1"->"p1", "n1"->"q1")),
("c", Map("m2"->"p2", "22"->"q2"))
))
val rddJoined = rdd1.fullOuterJoin(rdd2).map{
case (k, (u, v)) =>
(k, Seq(u.getOrElse(Map[String,String]()), v.getOrElse(Map[String,String]())))
}
rddJoined.collect
res1: Array[(String, Seq[Option[scala.collection.immutable.Map[String,String]]])] = Array(
(a, List(Map(x1 -> u1, y1 -> v1), Map(m1 -> p1, n1 -> q1))),
(b, List(Map(x2 -> u2, y2 -> v2, z2 -> w2), Map())),
(c, List(Map(), Map(m2 -> p2, 22 -> q2)))
)
ありがとうございます。出来た :) –
追加質問関連コードはこちら –