2017-12-18 8 views
0

配列から、新しい列にPysparkデータフレームの列を変換します私はこのような構造でPyspark DATAFRAMEをしました

root 
|-- Id: string (nullable = true) 
|-- Q: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- pr: string (nullable = true) 
| | |-- qt: double (nullable = true) 

に似た何か:

+----+--------------------- ... --+ 
| Id |   Q    | 
+----+---------------------- ... -+ 
| 001| [ [pr1,1.9], [pr3,2.0]...] | 
| 002| [ [pr2,1.0], [pr9,3.9]...] | 
| 003| [ [pr2,9.0], ...   ] | 
    ... 

私は(列にQ列に変換したいウォルド名前pr値qt)。 また、同じ列をマージ(追加)して重複した列を避けたいと思います。

+----+-----+-----+------+ ... ----+ 
| Id | pr1 | pr2 | pr3 | ... prn | 
+----+-----+-----+------+ ... ----+ 
| 001| 1.9 | 0.0 | 2.0 | ...  | 
| 002| 0.0 | 1.0 | 0 | ...  | 
| 003| 0.0 | 9.0 | ... | ...  | 
    ... 

この変換はどのように実行できますか。 事前に!! Julián。

+0

こんにちは、THX – ags29

+0

はいags29、答えは働いていたかあなたが他の質問がある場合は私に知らせて、ありがとうございました!!! –

答えて

2

あなたはexplodepivotの組み合わせでこれを行うことができます。

import pyspark.sql.functions as F 

# explode to get "long" format 
df=df.withColumn('exploded', F.explode('Q')) 

# get the name and the name in separate columns 
df=df.withColumn('name', F.col('exploded').getItem(0)) 
df=df.withColumn('value', F.col('exploded').getItem(1)) 

# now pivot 
df.groupby('Id').pivot('name').agg(F.max('value')).na.fill(0) 
0

非常に興味深い質問を。これが私のアプローチです。

のtest.CSV

001,pr1:0.9,pr3:1.2,pr2:2.0 
002,pr3:5.2,pr4:0.99 

Pyspark

file = sc.textFile("file:///test2.csv") 

//get it in (key,value) 
//[(u'001', u'pr1:0.9')...] 

//rdd1 = file.map(lambda r: r.replace(",","\t",1)).map(lambda r: r.split("\t")).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(',')) 
rdd1 = file.map(lambda r: r.split(",")[0]).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(',')) 

//create a DF with 3 columns 
//[(u'001', u'pr1', u'0.9')...)] 
+---+---+----+ 
| _1| _2| _3| 
+---+---+----+ 
|001|pr1| 0.9| 
|001|pr3| 1.2| 
|001|pr2| 2.0| 
|002|pr3| 5.2| 
|002|pr4|0.99| 
+---+---+----+ 


rdd2 = rdd1.map(lambda r: (r[0],r[1].split(":"))).map(lambda r: (r[0],r[1][0],r[1][1])) 
df = rdd2.toDF() 


//Perform the magic 
df.groupBy("_1").pivot("_2").agg(expr("coalesce(first(_3),0)")) 


+---+---+---+---+----+ 
| _1|pr1|pr2|pr3| pr4| 
+---+---+---+---+----+ 
|001|0.9|2.0|1.2| 0| 
|002| 0| 0|5.2|0.99| 
+---+---+---+---+----+ 
+0

タクヨンバラ、それは良い解決策です。 ags29が提案したものより少し長いかもしれません。 –

関連する問題