2017-10-03 13 views
1

私は、その各ブロックのようになり、かなり大きなRDDに多数の計算を実行するためにPySparkを使用している:PySpark窓関数理解

ID CHK C1 Flag1 V1 V2 C2 Flag2 V3 V4 
341 10 100 TRUE 10 10 150 FALSE 10 14 
341 9 100 TRUE 10 10 150 FALSE 10 14 
341 8 100 TRUE 14 14 150 FALSE 10 14 
341 7 100 TRUE 14 14 150 FALSE 10 14 
341 6 100 TRUE 14 14 150 FALSE 10 14 
341 5 100 TRUE 14 14 150 FALSE 10 14 
341 4 100 TRUE 14 14 150 FALSE 12 14 
341 3 100 TRUE 14 14 150 FALSE 14 14 
341 2 100 TRUE 14 14 150 FALSE 14 14 
341 1 100 TRUE 14 14 150 FALSE 14 14 
341 0 100 TRUE 14 14 150 FALSE 14 14 

私はIDの多くの出現を持っている(これはC1に依存値は、例えば100から130までです。多くのC1では、各整数に対して上記のような11行のセットがあります)、私は多くのIDを持っています。 (私はこの便利な記事に見られるような:https://arundhaj.com/blog/calculate-difference-with-previous-row-in-pyspark.html)私が何をしたか

D1 = ((row.V1 - prev_row.V1)/2)/((row.V2 + prev_row.V2)/2) 
D2 = ((row.V3 - prev_row.V3)/2)/((row.V4 + prev_row.V4)/2) 

は:ウィンドウを定義することです:私は何をする必要があり、各列のグループに式を適用して計算されます2つの列を追加することです

私は一時列を処分した最後

df = df.withColumn("prev_V1", lag(df.V1).over(my_window)) 
df = df.withColumn("prev_V21", lag(df.TA1).over(my_window)) 
df = df.withColumn("prev_V3", lag(df.SSQ2).over(my_window)) 
df = df.withColumn("prev_V4", lag(df.TA2).over(my_window)) 
df = df.withColumn("Sub_V1", F.when(F.isnull(df.V1 - df.prev_V1), 0).otherwise((df.V1 - df.prev_V1)/2)) 
df = df.withColumn("Sub_V2", (df.V2 + df.prev_V2)/2) 
df = df.withColumn("Sub_V3", F.when(F.isnull(df.V3 - df.prev_V3), 0).otherwise((df.V3 - df.prev_V3)/2)) 
df = df.withColumn("Sub_V4", (df.V4 + df.prev_V4)/2) 
df = df.withColumn("D1", F.when(F.isnull(df.Sub_V1/df.Sub_V2), 0).otherwise(df.Sub_V1/df.Sub_V2)) 
df = df.withColumn("D2", F.when(F.isnull(df.Sub_V3/df.Sub_V4), 0).otherwise(df.Sub_V3/df.Sub_V4)) 

my_window = Window.partitionBy().orderBy(desc("CHK")) 

と、私は「一時」の列を作成し、各中間計算の

final_df = df.select(*columns_needed) 

それは長いへの道を取って、私が取得保管:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 

上記のコードブロックは、計算を行うために、ループのためのカップルの内側にあるように私が正しく、私はこれをやっておりませんことを知っていますすべてのIDのために、すなわち使用してループ:

unique_IDs = list(df1.toPandas()['ID'].unique()) 

が、PySparkウィンドウ関数の詳細に見た後、私は正しく、ウィンドウpartitionBy()を設定することによって、私は同じ結果が道簡単に得ることができると信じています。

私はAvoid performance impact of a single partition mode in Spark window functionsを見ましたが、まだ私はこの作業をするために私のウィンドウ区画を正しく設定する方法がわかりません。

誰かが私にこれにどのように取り組むことができるかについての助けや洞察力を提供することはできますか?

あなたは

+1

は、私は何をする必要があり、各式を適用しています行のグループ - あなたはIDとC1のグループを意味しますか?もしそうなら、あなたのパーティションはそれらの列になければなりません。 – Suresh

答えて

0

私は式はIDの各グループ(それは私が「ID」に分割することを選んだ理由です)に適用されなければならないことを前提と感謝します。

あなたはこのようなもので「TEMP」の列を使用して避けることができ:

# used to define the lag of a specific column 
w_lag=Window.partitionBy("id","C1").orderBy(desc('chk')) 


df = df.withColumn('D1',((df.V1-F.lag(df.V1).over(w_lag))/2)\ 
         /((df.V2+F.lag(df.V2).over(w_lag))/2)) 

df = df.withColumn('D2',((df.V3-F.lag(df.V3).over(w_lag))/2)\ 
         /((df.V4+F.lag(df.V4).over(w_lag))/2)) 

結果は次のとおりです。

+---+---+---+-----+---+---+---+-----+---+---+-------------------+-------------------+ 
| id|chk| C1|Flag1| V1| V2| C2|Flag2| V3| V4|     D1|     D2| 
+---+---+---+-----+---+---+---+-----+---+---+-------------------+-------------------+ 
|341| 10|100| true| 10| 10|150| true| 10| 14|    null|    null| 
|341| 9|100| true| 10| 10|150| true| 10| 14|    0.0|    0.0| 
|341| 8|100| true| 14| 14|150| true| 10| 14|0.16666666666666666|    0.0| 
|341| 7|100| true| 14| 14|150| true| 10| 14|    0.0|    0.0| 
|341| 6|100| true| 14| 14|150| true| 10| 14|    0.0|    0.0| 
|341| 5|100| true| 14| 14|150| true| 10| 14|    0.0|    0.0| 
|341| 4|100| true| 14| 14|150| true| 12| 14|    0.0|0.07142857142857142| 
|341| 3|100| true| 14| 14|150| true| 14| 14|    0.0|0.07142857142857142| 
|341| 2|100| true| 14| 14|150| true| 14| 14|    0.0|    0.0| 
|341| 1|100| true| 14| 14|150| true| 14| 14|    0.0|    0.0| 
|341| 0|100| true| 14| 14|150| true| 14| 14|    0.0|    0.0| 
+---+---+---+-----+---+---+---+-----+---+---+-------------------+-------------------+ 
+1

これは、私が念頭に置いていたものに近いようですが、私の唯一の懸念は、これがデータセット全体で機能するかどうかということです。次の "データブロック"はid = 341、chk = 10 - > 0でもC1は110です。 'chk'))これを解決するだろうか? 最後に、私はD1 = 0.16666とちょっとしたことがあると思います。((14-10)/ 2)/(14 + 14/2)= 2/14 = 0.1428 – Swan87

+0

私は自分の答えを編集しました。確かに:Window.partitionBy( "id"、 "C1")あなたが望むもののために良いはずです。 D1 = 0である。16666:私はそれが((14-10)/ 2)/((14 + 10)/ 2)= 2/12 = 0.1666 – plalanne

+0

だと思います。レスポンスありがとう! – Swan87