2017-10-18 13 views
0

私はidcolAという2つのinicialカラムを持つデータフレームで作業しています。PySpark:もう1つの列をdataFrameに追加するにはどうすればよいですか?

+---+-----+ 
|id |colA | 
+---+-----+ 
| 1 | 5 | 
| 2 | 9 | 
| 3 | 3 | 
| 4 | 1 | 
+---+-----+ 

私は、もっと別の列にCOLBをそのデータフレームをマージする必要があります。私はcolBがdataFrameの最後に完全に合っていることを知っています。これらの結果は

+-----+ 
|colB | 
+-----+ 
| 5 | 
| 9 | 
| 3 | 
| 1 | 
+-----+ 

、私は以下のような新しいDATAFRAME取得する必要があります。

+---+-----+-----+ 
|id |colA |colB | 
+---+-----+-----+ 
| 1 | 5 | 8 | 
| 2 | 9 | 7 | 
| 3 | 3 | 0 | 
| 4 | 1 | 6 | 
+---+-----+-----+ 

これは最初のデータフレームを取得するためにpysparkコードです:

l=[(1,5),(2,9), (3,3), (4,1)] 
names=["id","colA"] 
db=sqlContext.createDataFrame(l,names) 
db.show() 

どのように私がやる?誰も助けてくれませんか?ありがとう

+0

あなたはスパークにデータフレームに任意の列を追加することはできません - ここに広範な答えを参照してください。https://stackoverflow.com/questions/33681487/how-do-i-add-a -new-column-to-a-spark-dataframe-using-pyspark – desertnaut

+0

[PySparkを使用してSpark DataFrameに新しい列を追加する方法は?](https://stackoverflow.com/questions/33681487)新しい列を追加する方法) – desertnaut

答えて

0

私はやった!私は行のインデックスと一時的な列を追加し、それを削除して解決しました。

コード:

from pyspark.sql import Row 
from pyspark.sql.window import Window 
from pyspark.sql.functions import rowNumber 
w = Window().orderBy() 

l=[(1,5),(2,9), (3,3), (4,1)] 
names=["id","colA"] 
db=sqlContext.createDataFrame(l,names) 
db.show() 

l=[5,9,3,1] 
rdd = sc.parallelize(l).map(lambda x: Row(x)) 
test_df = rdd.toDF() 
test_df2 = test_df.selectExpr("_1 as colB") 
dbB = test_df2.select("colB") 

db= db.withColum("columnindex", rowNumber().over(w)) 
dbB = dbB.withColum("columnindex", rowNumber().over(w)) 


testdf_out = db.join(dbB, db.columnindex == dbB.columnindex. 'inner').drop(db.columnindex).drop(dbB.columnindex) 
testdf_out.show() 
+0

monotonically_increasing_idを使用して、各データフレームに直接テンポラリインデックスを作成して結合することもできます。同様にチェックしてください。 – Suresh

関連する問題