複数のレコーダーの実行時間を計算します。同時に無限にレコーダーを走らせることができます。実行を知らないランタイム計算が開始されました
私が開始点と終了点を持つとき、次のコードスニペットで期待される結果が得られます。私の問題へ
今+-------------------+------+---+-------+-------+-------+---------+-------+
| timestamp|status|msg|started|stopped|engFlag|engWindow|runtime|
+-------------------+------+---+-------+-------+-------+---------+-------+
|2017-01-01 06:00:00| start| 1| 1| 0| 1| 1| 60.0|
|2017-01-01 07:00:00| start| 2| 1| 0| 1| 2| 120.0|
|2017-01-01 08:00:00| foo| 2| 0| 0| 0| 2| 120.0|
|2017-01-01 09:00:00| blub| 2| 0| 0| 0| 2| 120.0|
|2017-01-01 10:00:00| stop| 3| 0| -1| -1| 1| 60.0|
|2017-01-01 11:00:00| null| 3| 0| 0| 0| 1| 60.0|
|2017-01-01 12:00:00| ASC_c| 4| 0| 0| 0| 1| 60.0|
|2017-01-01 13:00:00| stop| 5| 0| -1| -1| 0| 0.0|
|2017-01-01 14:00:00| null| 3| 0| 0| 0| 0| 0.0|
|2017-01-01 15:00:00| ASC_c| 4| 0| 0| 0| 0| 0.0|
+-------------------+------+---+-------+-------+-------+---------+-------+
:
val ds2 = ds
.withColumn("started", when($"status" === "start", 1).otherwise(lit(0)))
.withColumn("stopped", when($"status" === "stop", -1).otherwise(lit(0)))
.withColumn("engFlag", when($"started" === 1, $"started").otherwise($"stopped"))
.withColumn("engWindow", sum($"engFlag").over(Window.orderBy($"timestamp")))
.withColumn("runtime", when($"engWindow" > 0,
(unix_timestamp(lead($"timestamp", 1).over(Window.orderBy($"timestamp"))) - unix_timestamp($"timestamp"))/60*$"engWindow").otherwise(lit(0)))
入力データ:
val ds_working = spark.sparkContext.parallelize(Seq(
("2017-01-01 06:00:00", "start", "1"),
("2017-01-01 07:00:00", "start", "2"),
("2017-01-01 08:00:00", "foo", "2"),
("2017-01-01 09:00:00", "blub", "2"),
("2017-01-01 10:00:00", "stop", "3"),
("2017-01-01 11:00:00", null, "3"),
("2017-01-01 12:00:00", "ASC_c", "4"),
("2017-01-01 13:00:00", "stop", "5"),
("2017-01-01 14:00:00", null, "3"),
("2017-01-01 15:00:00", "ASC_c", "4")
)).toDF("timestamp", "status", "msg")
出力
私は途中で計算を開始場合、私は、ランタイムを計算する方法が分かりませんランニングレコーダー。つまり、開始フラグは表示されませんが、停止フラグは表示されません。これは、過去に開始フラグが発生していなければならないことを示しています。
データ:
val ds_notworking = spark.sparkContext.parallelize(Seq(
("2017-01-01 02:00:00", "foo", "1"),
("2017-01-01 03:00:00", null, "2"),
("2017-01-01 04:00:00", "stop", "1"),
("2017-01-01 05:00:00", "stop", "2"),
("2017-01-01 06:00:00", "start", "1"),
("2017-01-01 07:00:00", "start", "2"),
("2017-01-01 08:00:00", "foo", "2"),
("2017-01-01 09:00:00", "blub", "2"),
("2017-01-01 10:00:00", "stop", "3"),
("2017-01-01 11:00:00", null, "3"),
("2017-01-01 12:00:00", "ASC_c", "4"),
("2017-01-01 13:00:00", "stop", "5"),
("2017-01-01 14:00:00", null, "3"),
("2017-01-01 15:00:00", "ASC_c", "4"),
)).toDF("timestamp", "status", "msg")
募集出力:
+-------------------+------+---+-------+-------+---------+-----+
| timestamp|status|msg|started|stopped|engWindow|runt |
+-------------------+------+---+-------+-------+---------+-----+
|2017-01-01 02:00:00| foo| 1| 0| 0| 0| 120 |
|2017-01-01 03:00:00| null| 2| 0| 0| 0| 120 |
|2017-01-01 04:00:00| stop| 1| 0| -1| -1| 60 |
|2017-01-01 05:00:00| stop| 2| 0| -1| -1| 0 |
|2017-01-01 06:00:00| start| 1| 1| 0| 1| 60 |
|2017-01-01 07:00:00| start| 2| 1| 0| 1| 120 |
|2017-01-01 08:00:00| foo| 2| 0| 0| 0| 120 |
|2017-01-01 09:00:00| blub| 2| 0| 0| 0| 120 |
|2017-01-01 10:00:00| stop| 3| 0| -1| -1| 60 |
|2017-01-01 11:00:00| null| 3| 0| 0| 0| 60 |
|2017-01-01 12:00:00| ASC_c| 4| 0| 0| 0| 60 |
|2017-01-01 13:00:00| stop| 5| 0| -1| -1| 0 |
|2017-01-01 14:00:00| null| 3| 0| 0| 0| 0 |
|2017-01-01 15:00:00| ASC_c| 4| 0| 0| 0| 0 |
+-------------------+------+---+-------+-------+---------+-----+
レコーダーのインスタンスが1つだけで同時に実行することができたとき、私はこの問題を解決しています
.withColumn("engWindow", last($"engFlag", true).over(systemWindow.rowsBetween(Window.unboundedPreceding, 0)))
しかし、 2つ以上のインスタンスでは、悲しいことに私はこれを達成するための手がかりがありません。 誰かが私を正しい方向に向けることができたらうれしいです。