複数のRDDの文字列をRDDの特定の順序でマージしようとしています。私はMap[String, RDD[Seq[String]]]
を作成(Seq
が一つだけの要素が含まれている場合)、その後RDD[Row[String]]
にそれらをマージしようとしたが、動作しているようですしません(RDD[Seq[String]]
の内容が失われている)。..誰かが任意のアイデアを持っていますか?複数のRDDを特定の順序でマージする
val t1: StructType
val mapFields: Map[String, RDD[Seq[String]]]
var ordRDD: RDD[Seq[String]] = context.emptyRDD
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name))
val rdd = ordRDD.map(line => Row.fromSeq(line))
EDIT: 私のRDDSは、各パーティションの要素の同じ数を持っていなかったので、スパーク例外にジップ機能リードを使用します。私はどのように各パーティションに同じ数の要素があるかを確認する方法がわからないので、インデックスをつけてから、ListMap
を使って順番に結合しています。おそらく、mapPartitions
関数で行うトリックがあるかもしれませんが、私はまだSpark APIについて十分な知識がありません。
val mapFields: Map[String, RDD[String]]
var ord: ListMap[String, RDD[String]] = ListMap()
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name)))
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq)
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) })
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)})
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) }
val df1 = spark.createDataFrame(rowRdd, t1)
あなたは何をすべきか「マージ」とは、各RDDが1つの_column_を結果に「寄与」することを意味しますか?もしそうなら、すべてのRDDが同じサイズでない場合はどうなりますか? –
はい、それぞれの「RDD」は列になります。 RDDは同じサイズであると想定されます。私はこの状況を考慮に入れる必要はないと思う。 – belgacea