2016-10-14 16 views
1

私はPandasで書かれたコードをPySparkに変換しています。コードには、ユーザー指定の入力に応じてさまざまな数の列を作成するためにループがたくさんあります(for)。PySpark DataFrameをループして新しい列を作成するより効率的な方法

私は、次のサンプルコードで、スパーク1.6.xのを使用しています:たとえば以下のため、

+----+ 
|val1| 
+----+ 
| 1| 
| 2| 
| 3| 
| 4| 
| 5| 
| 6| 
| 7| 
| 8| 
| 9| 
| 10| 
+----+ 

Iループのコードで多くを私に残し

from pyspark.sql import SQLContext 
from pyspark.sql import functions as F 
import pandas as pd 
import numpy as np 

# create a Pandas DataFrame, then convert to Spark DataFrame 
test = sqlContext.createDataFrame(pd.DataFrame({'val1': np.arange(1,11)})) 

になり
for i in np.arange(2,6).tolist(): 
    test = test.withColumn('val_' + str(i), F.lit(i ** 2) + test.val1) 

+----+-----+-----+-----+-----+ 
|val1|val_2|val_3|val_4|val_5| 
+----+-----+-----+-----+-----+ 
| 1| 5| 10| 17| 26| 
| 2| 6| 11| 18| 27| 
| 3| 7| 12| 19| 28| 
| 4| 8| 13| 20| 29| 
| 5| 9| 14| 21| 30| 
| 6| 10| 15| 22| 31| 
| 7| 11| 16| 23| 32| 
| 8| 12| 17| 24| 33| 
| 9| 13| 18| 25| 34| 
| 10| 14| 19| 26| 35| 
+----+-----+-----+-----+-----+ 

**質問:**上記のループをより効率的に書き換えるにはどうすればよいですか?

スパークが(2GBのテキスト入力のような小さなデータセットでも)ループの各グループで多くの時間を費やすので、自分のコードが遅くなることに気付きました。

ありがとうございます。

答えて

1

繰り返しJVMメソッドを呼び出す際のオーバーヘッドはわずかですが、それ以外の場合はループだけで問題になることはありません。

df = spark.range(1, 11).toDF("val1") 

def make_col(i): 
    return (F.pow(F.lit(i), 2) + F.col("val1")).alias("val_{0}".format(i)) 

spark.range(1, 11).toDF("val1").select("*", *(make_col(i) for i in range(2, 6))) 

また、NumPyタイプの使用を避けることもできます。 NumPyオブジェクトの初期化は通常、Pythonオブジェクトと比較して高価ですが、Spark SQLはNumPy型をサポートしていないため、追加の変換が必要になります。

+0

ありがとう、これは動作します。私はそれを私のコードにどうやって適用するかを考えます。私はSpark 1.6.xを使用しているので、コードを実行するときにエラーが発生します。主に '* .toDF(" val1 ")'というスキーマ型が必要であると不平を言います。 Spark 2.0.x上で正常に動作するので、修正が容易でなければならない –

-1

1つのwithColumnはrdd全体で機能します。だから一般的には、追加するすべての列に対してこのメ​​ソッドを使用するのは良い方法ではありません。マップ関数内で列とそのデータを操作する方法があります。 1つのマップ関数がここでジョブを実行しているので、新しい列とそのデータを追加するコードは並行して実行されます。

a。計算に基づいて新しい値を集めることができます

b。ここ

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

行以下のようにメインRDDにこれらの新しい列値を追加し、マップ法

Cの行の基準となります。

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

dのように新しいスキーマを作成します。古いスキーマに追加する

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e。新しい列で新しいデータフレームを作成する

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+1

ありがとうございますが、Scalaはそれに従うのが少し難しいです。あなたは、DataFrame全体でwithColumnが動作していることを意味しています。私はそれを動作させるために 'map'をどのように使うのか、頭を抱かせることはできません。 –

+0

mapを使用すると、各行で操作が実行されます。つまり、各行に対して、新しい列の新しいスキーマを作成し、それらの列のデータを準備し、古いスキーマ(データフレームから取得できる)に上記の新しいスキーマを追加し、最後に新しい列で新しいデータフレームを作成します。あなたはそれを探している場合は、上記の手順をPythonで考えることができます – Ramzy

関連する問題