2017-02-14 7 views
1

(編集2月14日)PySpark:インデックスに1列を使用し、別の(?2列のUDF)

のは、私は、次のスキーマを持つスパーク(PySpark)データフレームを持っているとしましょう:

root 
|-- myarray: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- myindices: array (nullable = true) 
| |-- element: integer (containsNull = true) 

それは次のようになります:

+--------------------+----------+ 
|   myarray | myindices| 
+--------------------+----------+ 
|     [A]| [0] | 
|    [B, C]| [1] | 
|  [D, E, F, G]| [0,2] | 
+--------------------+----------+ 

最初のインデックスにはどのように2番目の配列を使用できますか?

私の目標は、次のようになり、新たなデータフレームを作成することです:

+--------------------+----------+------+ 
|   myarray | myindices|result| 
+--------------------+----------+------+ 
|     [A]| [0] | [A] | 
|    [B, C]| [1] | [C] | 
|  [D, E, F, G]| [0,2] | [D,F]| 
+--------------------+----------+------+ 

myindicesの内容が常に問題となっている行のためmyarrayのカーディナリティの範囲内であることが保証されていると仮定しても安全ですしたがって、範囲外の問題はありません。)

.getItem()メソッドは1つの引数でしか動作しないため、ここではUDFが必要な場合がありますが、もっと多くのUDFを作成する方法はわかりません1つの列を入力として使用します。 UDFの有無にかかわらず、あらゆるソリューション?

+3

df.withColumn( 'アイテム'、DF [ 'myarrayの']のgetItem(DF [ 'myposition'])) –

+1

これが答えではなく、コメントであるべきである@zhangtong。 –

+0

@zhangtong:ありがとう。残念ながら私の実際のニーズはもう少し複雑です。私はこれをより明確にするために質問を編集しました。改訂された質問を見て、あなたに提案があるかどうか確認してください。 – xenocyon

答えて

2
from pyspark.sql import functions as f 

rdd = spark.sparkContext.parallelize([(['A'], [0]), (['B', 'C'], [1]), (['D', 'E', 'F'], [0, 2])]) 
df = spark.createDataFrame(rdd, ['myarray', 'myindices']) 
my_UDF = f.UserDefinedFunction(lambda x, y: map(lambda z: x[z], y), returnType=ArrayType(StringType())) 
res = df.withColumn('result', my_UDF(df['myarray'], df['myindices'])) 
res.show(truncate=False) 

output: 
+---------+---------+------+ 
|myarray |myindices|result| 
+---------+---------+------+ 
|[A]  |[0]  |[A] | 
|[B, C] |[1]  |[C] | 
|[D, E, F]|[0, 2] |[D, F]| 
+---------+---------+------+ 
+0

これは完璧に動作し、引数として2つの列を取るUDFのきちんとした例です。 – xenocyon

関連する問題