2017-05-21 16 views
0

2つのRDDの列を持つDataframeを作成したいとします。 最初はCSVから取得したRDDで、もう1つは各行のクラスタ予測による別のRDDです。DataFrame列にRDDを追加PySpark

私のスキーマは次のとおりです。

customSchema = StructType([ \ 
StructField("Area", FloatType(), True), \ 
StructField("Perimeter", FloatType(), True), \ 
StructField("Compactness", FloatType(), True), \ 
StructField("Lenght", FloatType(), True), \ 
StructField("Width", FloatType(), True), \ 
StructField("Asymmetry", FloatType(), True), \ 
StructField("KernelGroove", FloatType(), True)]) 

は私のRDDの地図とデータフレームを作成します。

FN2 = rdd.map(lambda x: (float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]),float(x[6]))) 
df = sqlContext.createDataFrame(FN2, customSchema) 

そして、私のクラスタ予測:

result = Kmodel.predict(rdd) 

だから、私は持っていたい締結します私のDataFrameには私のCSVの行と最後にクラスタ予測があります。

新しい列を.WithColumn()で追加しようとしましたが、何も見つかりませんでした。

ありがとうございました。あなたは、両方のデータフレーム上の共通フィールドを持っている場合は

答えて

0

、そしてそれ以外のユニークなIDを作成し、キーと結合して生成するために、単一のデータフレーム

ScalaのコードでCSVとそのクラスタ予測の行を取得するには、両方のデータフレームに参加各行の一意のID、pysparkの変換を試みます。あなたは増加行IDを生成し、行ID

import org.apache.spark.sql.types.{StructType, StructField, LongType} 
val df = sc.parallelize(Seq(("abc", 2), ("def", 1), ("hij", 3))).toDF("word", "count") 
val wcschema = df.schema 
val inputRows = df.rdd.zipWithUniqueId.map{ 
    case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} 
val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields)) 

またはSQLクエリに

val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount") 
tmpTable1.show() 
を使用してそれらを結合する必要があります
関連する問題