2017-08-16 19 views
0

複数のRDDの文字列をRDDの特定の順序でマージしようとしています。私はMap[String, RDD[Seq[String]]]を作成(Seqが一つだけの要素が含まれている場合)、その後RDD[Row[String]]にそれらをマージしようとしたが、動作しているようですしません(RDD[Seq[String]]の内容が失われている)。..誰かが任意のアイデアを持っていますか?複数のRDDを特定の順序でマージする

val t1: StructType 
val mapFields: Map[String, RDD[Seq[String]]] 
var ordRDD: RDD[Seq[String]] = context.emptyRDD 
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name)) 
val rdd = ordRDD.map(line => Row.fromSeq(line)) 

EDIT: 私のRDDSは、各パーティションの要素の同じ数を持っていなかったので、スパーク例外にジップ機能リードを使用します。私はどのように各パーティションに同じ数の要素があるかを確認する方法がわからないので、インデックスをつけてから、ListMapを使って順番に結合しています。おそらく、mapPartitions関数で行うトリックがあるかもしれませんが、私はまだSpark APIについて十分な知識がありません。

val mapFields: Map[String, RDD[String]] 
var ord: ListMap[String, RDD[String]] = ListMap() 
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name))) 
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition 
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq) 
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) }) 
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)}) 
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) } 
val df1 = spark.createDataFrame(rowRdd, t1) 
+0

あなたは何をすべきか「マージ」とは、各RDDが1つの_column_を結果に「寄与」することを意味しますか?もしそうなら、すべてのRDDが同じサイズでない場合はどうなりますか? –

+0

はい、それぞれの「RDD」は列になります。 RDDは同じサイズであると想定されます。私はこの状況を考慮に入れる必要はないと思う。 – belgacea

答えて

1

ここで重要なのは、RDDSは(各レコードはエルRDDSで同じインデックスを持つレコードの組み合わせであるRDDを作成する)一緒に「郵便番号」にRDD.zipを使用している:

import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 

// INPUT: Map does not preserve order (not the defaul implementation, at least) - using Seq 
val rdds: Seq[(String, RDD[String])] = Seq(
    "field1" -> sc.parallelize(Seq("a", "b", "c")), 
    "field2" -> sc.parallelize(Seq("1", "2", "3")), 
    "field3" -> sc.parallelize(Seq("Q", "W", "E")) 
) 

// Use RDD.zip to zip all RDDs together, then convert to Rows 
val rowRdd: RDD[Row] = rdds 
    .map(_._2) 
    .map(_.map(s => Seq(s))) 
    .reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map { case (l1, l2) => l1 ++ l2 }) 
    .map(Row.fromSeq) 

// Create schema using the column names: 
val schema: StructType = StructType(rdds.map(_._1).map(name => StructField(name, StringType))) 

// Create DataFrame: 
val result: DataFrame = spark.createDataFrame(rowRdd, schema) 

result.show 
// +------+------+------+ 
// |field1|field2|field3| 
// +------+------+------+ 
// |  a|  1|  Q| 
// |  b|  2|  W| 
// |  c|  3|  E| 
// +------+------+------+ 
+0

私が言ったとは異なり、火花が各パーティション内の要素の数が同じでRDDSを唯一のジップことができるので、RDDのサイズとパーティションは、考慮して取るしなければならないようです。それ以外の場合は、スパーク例外が発生する可能性があります。 – belgacea

関連する問題