2016-11-15 14 views
1

PythonとPySparkの新機能です。私は、次のようなPySparkでデータフレームを持っている:インデックスがデータフレームの1つの列にあるリストを使用してPySparkデータフレームに列を作成します。

## +---+---+------+ 
## | x1| x2| x3 | 
## +---+---+------+ 
## | 0| a | 13.0| 
## | 2| B | -33.0| 
## | 1| B | -63.0| 
## +---+---+------+ 

Iアレイを有する: ARR = [10、12、13]

私はそれがなければならないこと、そのようなデータフレームの列X4を作成しますインデックスとしてx1の値に基づいてリストから対応する値を持つ。

df.withColumn("x4", lit(arr[col('x1')])).show()

をしかし、私はエラーを取得しています:

IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices 

です私は達成するために、次のコードを使用してみましたが

## +---+---+------+-----+ 
## | x1| x2| x3 | x4 | 
## +---+---+------+-----+ 
## | 0| a | 13.0| 10 | 
## | 2| B | -33.0| 13 | 
## | 1| B | -63.0| 12 | 
## +---+---+------+-----+ 

:最終データセットは次のようになります。私はこれを効率的に達成する方法はありますか?

答えて

0

配列のインデックスと元のDataFrameの結合を行う場合、1つの方法は、配列をDataFrameに変換し、rownumber()-1(インデックスになります)を生成し、次に2つのDataFramesを結合します。

from pyspark.sql import Row 

# Create original DataFrame `df` 
df = sqlContext.createDataFrame(
    [(0, "a", 13.0), (2, "B", -33.0), (1, "B", -63.0)], ("x1", "x2", "x3")) 
df.createOrReplaceTempView("df") 

# Create column "x4" 
row = Row("x4") 

# Take the array 
arr = [10, 12, 13] 

# Convert Array to RDD, and then create DataFrame 
rdd = sc.parallelize(arr) 
df2 = rdd.map(row).toDF() 
df2.createOrReplaceTempView("df2") 

# Create indices via row number 
df3 = spark.sql("SELECT (row_number() OVER (ORDER by x4))-1 as indices, * FROM df2") 
df3.createOrReplaceTempView("df3") 

は今、次の2つのデータフレームを持っていること:dfdf3は、あなたが2つのデータフレームを結合するには、以下のSQLクエリを実行することができます。

select a.x1, a.x2, a.x3, b.x4 from df a join df3 b on b.indices = a.x1 

ここでは、adding columns to DataFramesの参考回答もあります。

関連する問題