2017-06-06 8 views
3

Apache Spark 2.0 Dataframe/Dataset APIを使用しています 値リストからデータフレームに新しい列を追加します。私のリストは与えられたデータフレームのような同じ数の値を持っています。Apache Sparkリスト/配列から新しい列をSparkデータフレームに追加する方法

val list = List(4,5,10,7,2) 
val df = List("a","b","c","d","e").toDF("row1") 

私のような何かをしたいと思います:私はGREATFULだろう任意のアイデアについて

val appendedDF = df.withColumn("row2",somefunc(list)) 
df.show() 
// +----+------+ 
// |row1 |row2 | 
// +----+------+ 
// |a |4 | 
// |b |5 | 
// |c |10 | 
// |d |7 | 
// |e |2 | 
// +----+------+ 

を、実際に私のデータフレームは、多くの列が含まれています。

+0

リストとDFのサイズが異なる場合はどうなりますか?より大きいコレクション(N =短いコレクションのサイズ)の最初のN個のアイテムのみを含めますか? –

+0

私の場合、常に同じ長さになることはわかっています –

+0

リストをデータフレームにも変換できます。次に両方にrow_numberを追加し、row_numberで結合します。 –

答えて

5

あなたはこのようにそれを行うことができます:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._  

// create rdd from the list 
val rdd = sc.parallelize(List(4,5,10,7,2)) 
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:28 

// zip the data frame with rdd 
val rdd_new = df.rdd.zip(rdd).map(r => Row.fromSeq(r._1.toSeq ++ Seq(r._2))) 
// rdd_new: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[33] at map at <console>:32 

// create a new data frame from the rdd_new with modified schema 
spark.createDataFrame(rdd_new, df.schema.add("new_col", IntegerType)).show 
+----+-------+ 
|row1|new_col| 
+----+-------+ 
| a|  4| 
| b|  5| 
| c|  10| 
| d|  7| 
| e|  2| 
+----+-------+ 
4

を完全にするために追加:(ドライバー・メモリに存在する)入力listが同じサイズを持っているという事実をDataFrameが、これは小規模データフレームであることを示唆しているよう始まる - あなたはcollect()は、それを-ing listでビュン、および必要に応じてDataFrameに戻す変換検討するかもしれない:

df.collect() 
    .map(_.getAs[String]("row1")) 
    .zip(list).toList 
    .toDF("row1", "row2") 

速くされないことが、もしデータは実際には小さく、無視できるものであり、コードは(間違いなく)明確です。

+1

私はこの答えも本当に好きですが、私は小さなデータセットでは完全に実現可能です –