私は、その各ブロックのようになり、かなり大きなRDDに多数の計算を実行するためにPySparkを使用している:PySpark窓関数理解
ID CHK C1 Flag1 V1 V2 C2 Flag2 V3 V4
341 10 100 TRUE 10 10 150 FALSE 10 14
341 9 100 TRUE 10 10 150 FALSE 10 14
341 8 100 TRUE 14 14 150 FALSE 10 14
341 7 100 TRUE 14 14 150 FALSE 10 14
341 6 100 TRUE 14 14 150 FALSE 10 14
341 5 100 TRUE 14 14 150 FALSE 10 14
341 4 100 TRUE 14 14 150 FALSE 12 14
341 3 100 TRUE 14 14 150 FALSE 14 14
341 2 100 TRUE 14 14 150 FALSE 14 14
341 1 100 TRUE 14 14 150 FALSE 14 14
341 0 100 TRUE 14 14 150 FALSE 14 14
私はIDの多くの出現を持っている(これはC1に依存値は、例えば100から130までです。多くのC1では、各整数に対して上記のような11行のセットがあります)、私は多くのIDを持っています。 (私はこの便利な記事に見られるような:https://arundhaj.com/blog/calculate-difference-with-previous-row-in-pyspark.html)私が何をしたか
D1 = ((row.V1 - prev_row.V1)/2)/((row.V2 + prev_row.V2)/2)
D2 = ((row.V3 - prev_row.V3)/2)/((row.V4 + prev_row.V4)/2)
は:ウィンドウを定義することです:私は何をする必要があり、各列のグループに式を適用して計算されます2つの列を追加することです
私は一時列を処分した最後df = df.withColumn("prev_V1", lag(df.V1).over(my_window))
df = df.withColumn("prev_V21", lag(df.TA1).over(my_window))
df = df.withColumn("prev_V3", lag(df.SSQ2).over(my_window))
df = df.withColumn("prev_V4", lag(df.TA2).over(my_window))
df = df.withColumn("Sub_V1", F.when(F.isnull(df.V1 - df.prev_V1), 0).otherwise((df.V1 - df.prev_V1)/2))
df = df.withColumn("Sub_V2", (df.V2 + df.prev_V2)/2)
df = df.withColumn("Sub_V3", F.when(F.isnull(df.V3 - df.prev_V3), 0).otherwise((df.V3 - df.prev_V3)/2))
df = df.withColumn("Sub_V4", (df.V4 + df.prev_V4)/2)
df = df.withColumn("D1", F.when(F.isnull(df.Sub_V1/df.Sub_V2), 0).otherwise(df.Sub_V1/df.Sub_V2))
df = df.withColumn("D2", F.when(F.isnull(df.Sub_V3/df.Sub_V4), 0).otherwise(df.Sub_V3/df.Sub_V4))
:
my_window = Window.partitionBy().orderBy(desc("CHK"))
と、私は「一時」の列を作成し、各中間計算の
final_df = df.select(*columns_needed)
それは長いへの道を取って、私が取得保管:
WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
上記のコードブロックは、計算を行うために、ループのためのカップルの内側にあるように私が正しく、私はこれをやっておりませんことを知っていますすべてのIDのために、すなわち使用してループ:
unique_IDs = list(df1.toPandas()['ID'].unique())
が、PySparkウィンドウ関数の詳細に見た後、私は正しく、ウィンドウpartitionBy()を設定することによって、私は同じ結果が道簡単に得ることができると信じています。
私はAvoid performance impact of a single partition mode in Spark window functionsを見ましたが、まだ私はこの作業をするために私のウィンドウ区画を正しく設定する方法がわかりません。
誰かが私にこれにどのように取り組むことができるかについての助けや洞察力を提供することはできますか?
あなたは
は、私は何をする必要があり、各式を適用しています行のグループ - あなたはIDとC1のグループを意味しますか?もしそうなら、あなたのパーティションはそれらの列になければなりません。 – Suresh