私はstart_time
とend_time
の列を持つDataFrameを持っています。私は窓を設定したいと思います。各観測のウィンドウは終了時刻までに2行あり、のデータに限定されています。その前に観測のstart_time
があります。スパークウィンドウ関数:範囲の異なる列を参照する
例データ:
data = [('a', 10, 12, 5),('b', 20, 25, 10),('c', 30, 60, 15),('d', 40, 45, 20),('e', 50, 70, 25)]
df = sqlContext.createDataFrame(data, ['name', 'start_time', 'end_time', 'resource'])
+----+----------+--------+--------+
|name|start_time|end_time|resource|
+----+----------+--------+--------+
| a| 10| 12| 5|
| b| 20| 25| 10|
| c| 30| 60| 15|
| d| 40| 45| 20|
| e| 50| 70| 25|
+----+----------+--------+--------+
だから、 'E' のためのウィンドウには、終了時間<開始時間の制限なしに 'B' と 'D' ではなく、 'C'
を含める必要があります私はrangeBetween()
に見えたが、私は現在の行のstart_time
を参照する方法を見つけ出すことができない、または私は制限したいという
from pyspark.sql import Window
from pyspark.sql import functions as func
window = Window.orderBy("name").rowsBetween(-2, -1)
df.select('*', func.avg("resource").over(window).alias("avg")).show()
を使用することができましたそれは他の行のend_time
によってそれです。 Window.currentRow
がありますが、この例では値が参照されるのはresource
です
これはWindowで可能ですか?私は何か他のことを完全に試みるべきですか?
編集:重要な場合は、Spark 2.1.1とPython 2.7+を使用してください。
'partitionBy'とは何ですか?それがなければ、同じパーティション内のすべての行と大規模なデータセットでそれを殺す単一のエグゼキュータで終わるでしょう。 –
ええ、私の実際のデータはかなり大きいので、うまくいきそうな 'partitionBy'を持っています - それぞれのグループにa、b、cなどの' name'のグループがあります。コードを実行すると終了時刻<現在の行開始時刻のデータだけを考慮することの制限なしに、問題はないようです。私はこの制限を統合するのが難しいです。 – aku