2016-04-07 6 views
4

スパークい。 詳細はhttp://seananderson.ca/2013/10/19/reshape.htmlを参照してください。をサポート我々はワイド>長く、長期>ワイドフォーマットからデータを変換するために、溶融しdcast使用溶融し、dcast

どちらかスカラ座やSparkRは大丈夫です。

私はこれを経験しましたblogscala functionsR APIです。 似たような仕事をする機能はありません。

スパークのいずれかの同等の機能はありますか?もしそうでなければ、スパークでそれを行う他の方法はありますか?

+0

そうではありません。あなたのデータをメモリに収めるには、 'as.data.frame()'を使ってSpark DataFrameをネイティブのdata.frameに変換し、再構成してSparkに書き戻します。 – Thomas

+0

何もありませんので。あなた自身で書く必要があります。 – eliasah

答えて

10

Reshaping Data with Pivot in Sparkは、pivotで再形成をサポートします。私はmeltがほぼと呼ばれるピボットの逆であると理解しました。私は比較的新しいSparkです。私の知る限り、私は溶融操作を実装しようとしました。

def melt(df: DataFrame, columns: List[String]): DataFrame ={ 

    val restOfTheColumns = df.columns.filterNot(columns.contains(_)) 
    val baseDF = df.select(columns.head, columns.tail: _*) 
    val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true))) 
    var newdf = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure) 

    for(variableCol <- restOfTheColumns){ 
     val colValues = df.select(variableCol).map(r=> r(0).toString) 
     val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString))) 
     var colDF =sqlContext.createDataFrame(colRdd, newStructure) 
     newdf =newdf.unionAll(colDF) 
    } 
    newdf 
    } 

これは作業です。しかし、私は効率についてはあまりよく分かりません。

+-----+---+---+----------+------+ 
| name|sex|age| street|weight| 
+-----+---+---+----------+------+ 
|Alice| f| 34| somewhere| 70| 
| Bob| m| 63| nowhere| -70| 
|Alice| f|612|nextstreet| 23| 
| Bob| m|612|  moon|  8| 
+-----+---+---+----------+------+ 

melt(df, List("name", "sex")) 

として使用することができ、結果は以下の通りです:

+-----+---+--------+----------+ 
| name|sex|variable|  value| 
+-----+---+--------+----------+ 
|Alice| f|  age|  34| 
| Bob| m|  age|  63| 
|Alice| f|  age|  612| 
| Bob| m|  age|  612| 
|Alice| f| street| somewhere| 
| Bob| m| street| nowhere| 
|Alice| f| street|nextstreet| 
| Bob| m| street|  moon| 
|Alice| f| weight|  70| 
| Bob| m| weight|  -70| 
|Alice| f| weight|  23| 
| Bob| m| weight|   8| 
+-----+---+--------+----------+ 

改善の余地がある場合、私はそれが有用であると思いますし、あなたのコメントに感謝します。

0

はここだけで、データセットの操作(なしRDDのもの)

case class Melt(meltColumns: String*) extends Transformer{ 

    override def transform(in: Dataset[_]): DataFrame = { 
    val nonMeltColumns = in.columns.filterNot{ meltColumns.contains } 
    val newDS = in 
     .select(nonMeltColumns.head,meltColumns:_*) 
     .withColumn("variable", functions.lit(nonMeltColumns.head)) 
     .withColumnRenamed(nonMeltColumns.head,"value") 

    nonMeltColumns.tail 
     .foldLeft(newDS){ case (acc,col) => 
     in 
      .select(col,meltColumns:_*) 
      .withColumn("variable", functions.lit(col)) 
      .withColumnRenamed(col,"value") 
      .union(acc) 
     } 
     .select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*) 
    } 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    @DeveloperApi 
    override def transformSchema(schema: StructType): StructType = ??? 

    override val uid: String = Identifiable.randomUID("Melt") 
} 

を使用していますspark.ml.Transformerだ。ここ

"spark" should "melt a dataset" in { 
    import spark.implicits._ 
    val schema = StructType(
     List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++ 
     Range(3,10).map{ i => StructField("name_"+i,DoubleType)}.toList) 

    val ds = Range(1,11) 
     .map{ i => Row("a" :: "b" :: Range(3,10).map{ j => Math.random() }.toList :_ *)} 
     .|>{ rows => spark.sparkContext.parallelize(rows) } 
     .|>{ rdd => spark.createDataFrame(rdd,schema) } 

    val newDF = ds.transform{ df => 
     Melt("Melt1","Melt2").transform(df) } 

    assert(newDF.count() === 70) 
    } 

それを使用するテストです|。>

0

スパークscalaZパイプ演算子でありますDataFrameには、機能を提供するexplodeメソッドがあります。 Spark 1.6.1で動作する例:

// input df has columns (anyDim, n1, n2) 
case class MNV(measureName: String, measureValue: Integer); 
val dfExploded = df.explode(col("n1"), col("n2")) { 
    case Row(n1: Int, n2: Int) => 
    Array(MNV("n1", n1), MNV("n2", n2)) 
} 
// dfExploded has columns (anyDim, n1, n2, measureName, measureValue) 
関連する問題