0

私はstart_timeend_timeの列を持つDataFrameを持っています。私は窓を設定したいと思います。各観測のウィンドウは終了時刻までに2行あり、のデータに限定されています。その前に観測のstart_timeがあります。スパークウィンドウ関数:範囲の異なる列を参照する

例データ:

data = [('a', 10, 12, 5),('b', 20, 25, 10),('c', 30, 60, 15),('d', 40, 45, 20),('e', 50, 70, 25)] 
df = sqlContext.createDataFrame(data, ['name', 'start_time', 'end_time', 'resource']) 
+----+----------+--------+--------+ 
|name|start_time|end_time|resource| 
+----+----------+--------+--------+ 
| a|  10|  12|  5| 
| b|  20|  25|  10| 
| c|  30|  60|  15| 
| d|  40|  45|  20| 
| e|  50|  70|  25| 
+----+----------+--------+--------+ 

だから、 'E' のためのウィンドウには、終了時間<開始時間の制限なしに 'B' と 'D' ではなく、 'C'

を含める必要があります私はrangeBetween()に見えたが、私は現在の行のstart_timeを参照する方法を見つけ出すことができない、または私は制限したいという

from pyspark.sql import Window   
from pyspark.sql import functions as func 
window = Window.orderBy("name").rowsBetween(-2, -1) 
df.select('*', func.avg("resource").over(window).alias("avg")).show() 

を使用することができましたそれは他の行のend_timeによってそれです。 Window.currentRowがありますが、この例では値が参照されるのはresourceです

これはWindowで可能ですか?私は何か他のことを完全に試みるべきですか?

編集:重要な場合は、Spark 2.1.1とPython 2.7+を使用してください。

+0

'partitionBy'とは何ですか?それがなければ、同じパーティション内のすべての行と大規模なデータセットでそれを殺す単一のエグゼキュータで終わるでしょう。 –

+0

ええ、私の実際のデータはかなり大きいので、うまくいきそうな 'partitionBy'を持っています - それぞれのグループにa、b、cなどの' name'のグループがあります。コードを実行すると終了時刻<現在の行開始時刻のデータだけを考慮することの制限なしに、問題はないようです。私はこの制限を統合するのが難しいです。 – aku

答えて

1

実際にはgroupBy関数を使用して異なるパーティションに集約し、同じ共通キーを使用して出力データフレーム間で内部結合を使用できます。あなたができるならば、代わりにgroupbyを使うほうが良いでしょう。

1

純粋にウィンドウを使用してこれを行うことはできません。特定の行から、条件を満たす2つのヒットが得られるまで、前の行を逆順ソート順で処理する必要があります。

ウィンドウ関数を使用して、各行で検出されたすべての以前の値のリストを作成し、純粋なscala/pythonを使用してUDFを作成して、合計を判断することができます。 Scalaで

val window = Window.partitionBy(???).orderBy("end_time").rowsBetween(Long.MinValue, -1) 

val udfWithSelectionLogic = udf { values: Seq[Row] => INSERT_LOGIC_HERE_TO_CALCULATE_AGGREGATE } 

val dataPlus = data.withColumn("combined", struct($"start_time", $"end_time", $"resource")) 
     .withColumn("collected", collect_list($"combined") over window) 
     .withColumn("result", udfWithSelectionLogic($"collected")) 

これは理想的ではありませんが、役に立つかもしれません。