2017-08-01 13 views
0

RDD [Vector]からscala/spark 1.6のDataFrameへの変換を一般化する最適なソリューションは何ですか? 入力はRDD [ベクトル]と異なります。 Vectorの列番号は、RDDごとに1〜nです。Spark - RDD [Vector]を可変列のDataFrameに変換する

シェイプレスライブラリを使用しようとしましたが、宣言された列番号と型が必要です。 ES:

val df = rddVector.map(_.toArray.toList) 
    .collect { 
      case t: List[Double] if t.length == 3 => t.toHList[Double :: Double :: Double :: HNil].get.tupled.productArity 
    } 
    .toDF("column_1", "column_2", "column_3") 

ありがとうございます!

+0

私が理解から、私はここに似たような答え:それはあなたのために働く場合https://stackoverflow.com/a/45009516/7224597 あなたは確認することができますか? – philantrovert

答えて

2

これは私のために働いた。

// Create a vector rdd 
    val vectorRDD = sc.parallelize(Seq(Seq(123L, 345L), Seq(567L, 789L), Seq(567L, 789L, 233334L))). 
    map(s => Vectors.dense(s.toSeq.map(_.toString.toDouble).toArray)) 

    // Calculate the maximum length of the vector to create a schema 
    val vectorLength = vectorRDD.map(x => x.toArray.length).max() 

    // create the dynamic schema 
    var schema = new StructType() 
    var i = 0 
    while (i < vectorLength) { 
    schema = schema.add(StructField(s"val${i}", DoubleType, true)) 
    i = i + 1 
    } 

    // create a rowRDD variable and make each row have the same arity 
    val rowRDD = vectorRDD.map { x => 
    var row = new Array[Double](vectorLength) 
    val newRow = x.toArray 

    System.arraycopy(newRow, 0, row, 0, newRow.length); 

    println(row.length) 

    Row.fromSeq(row) 
    } 

    // create your dataframe 
    val dataFrame = sqlContext.createDataFrame(rowRDD, schema) 

出力:

root 
|-- val0: double (nullable = true) 
|-- val1: double (nullable = true) 
|-- val2: double (nullable = true) 

+-----+-----+--------+ 
| val0| val1| val2| 
+-----+-----+--------+ 
|123.0|345.0|  0.0| 
|567.0|789.0|  0.0| 
|567.0|789.0|233334.0| 
+-----+-----+--------+ 
+0

このソリューションでは、固定スキーマを作成する必要があります。 私はスキーマを知らない。スキーマは可変です。 My Sparkのバージョンは1.6、2.0はありません。 –

+1

私はあなたが提供している条件に合わせて答えを更新しました。それは解決策の最も素質ではありませんが、うまくいくでしょう:) –

関連する問題