2017-09-06 10 views
0

複数のレコーダーの実行時間を計算します。同時に無限にレコーダーを走らせることができます。実行を知らないランタイム計算が開始されました

私が開始点と終了点を持つとき、次のコードスニペットで期待される結果が得られます。私の問題へ

+-------------------+------+---+-------+-------+-------+---------+-------+ 
|   timestamp|status|msg|started|stopped|engFlag|engWindow|runtime| 
+-------------------+------+---+-------+-------+-------+---------+-------+ 
|2017-01-01 06:00:00| start| 1|  1|  0|  1|  1| 60.0| 
|2017-01-01 07:00:00| start| 2|  1|  0|  1|  2| 120.0| 
|2017-01-01 08:00:00| foo| 2|  0|  0|  0|  2| 120.0| 
|2017-01-01 09:00:00| blub| 2|  0|  0|  0|  2| 120.0| 
|2017-01-01 10:00:00| stop| 3|  0|  -1|  -1|  1| 60.0| 
|2017-01-01 11:00:00| null| 3|  0|  0|  0|  1| 60.0| 
|2017-01-01 12:00:00| ASC_c| 4|  0|  0|  0|  1| 60.0| 
|2017-01-01 13:00:00| stop| 5|  0|  -1|  -1|  0| 0.0| 
|2017-01-01 14:00:00| null| 3|  0|  0|  0|  0| 0.0| 
|2017-01-01 15:00:00| ASC_c| 4|  0|  0|  0|  0| 0.0| 
+-------------------+------+---+-------+-------+-------+---------+-------+ 

val ds2 = ds 
     .withColumn("started", when($"status" === "start", 1).otherwise(lit(0))) 
     .withColumn("stopped", when($"status" === "stop", -1).otherwise(lit(0))) 
     .withColumn("engFlag", when($"started" === 1, $"started").otherwise($"stopped")) 
     .withColumn("engWindow", sum($"engFlag").over(Window.orderBy($"timestamp"))) 
     .withColumn("runtime", when($"engWindow" > 0, 
     (unix_timestamp(lead($"timestamp", 1).over(Window.orderBy($"timestamp"))) - unix_timestamp($"timestamp"))/60*$"engWindow").otherwise(lit(0))) 

入力データ:

val ds_working = spark.sparkContext.parallelize(Seq(
     ("2017-01-01 06:00:00", "start", "1"), 
     ("2017-01-01 07:00:00", "start", "2"), 
     ("2017-01-01 08:00:00", "foo", "2"), 
     ("2017-01-01 09:00:00", "blub", "2"), 
     ("2017-01-01 10:00:00", "stop", "3"), 
     ("2017-01-01 11:00:00", null, "3"), 
     ("2017-01-01 12:00:00", "ASC_c", "4"), 
     ("2017-01-01 13:00:00", "stop", "5"), 
     ("2017-01-01 14:00:00", null, "3"), 
     ("2017-01-01 15:00:00", "ASC_c", "4") 
    )).toDF("timestamp", "status", "msg") 

出力
私は途中で計算を開始場合、私は、ランタイムを計算する方法が分かりませんランニングレコーダー。つまり、開始フラグは表示されませんが、停止フラグは表示されません。これは、過去に開始フラグが発生していなければならないことを示しています。

データ:

val ds_notworking = spark.sparkContext.parallelize(Seq(
     ("2017-01-01 02:00:00", "foo", "1"), 
     ("2017-01-01 03:00:00", null, "2"), 
     ("2017-01-01 04:00:00", "stop", "1"), 
     ("2017-01-01 05:00:00", "stop", "2"), 
     ("2017-01-01 06:00:00", "start", "1"), 
     ("2017-01-01 07:00:00", "start", "2"), 
     ("2017-01-01 08:00:00", "foo", "2"), 
     ("2017-01-01 09:00:00", "blub", "2"), 
     ("2017-01-01 10:00:00", "stop", "3"), 
     ("2017-01-01 11:00:00", null, "3"), 
     ("2017-01-01 12:00:00", "ASC_c", "4"), 
     ("2017-01-01 13:00:00", "stop", "5"), 
     ("2017-01-01 14:00:00", null, "3"), 
     ("2017-01-01 15:00:00", "ASC_c", "4"), 
    )).toDF("timestamp", "status", "msg") 

募集出力:

+-------------------+------+---+-------+-------+---------+-----+ 
|   timestamp|status|msg|started|stopped|engWindow|runt | 
+-------------------+------+---+-------+-------+---------+-----+ 
|2017-01-01 02:00:00| foo| 1|  0|  0|  0| 120 | 
|2017-01-01 03:00:00| null| 2|  0|  0|  0| 120 | 
|2017-01-01 04:00:00| stop| 1|  0|  -1|  -1| 60 | 
|2017-01-01 05:00:00| stop| 2|  0|  -1|  -1| 0 | 
|2017-01-01 06:00:00| start| 1|  1|  0|  1| 60 | 
|2017-01-01 07:00:00| start| 2|  1|  0|  1| 120 | 
|2017-01-01 08:00:00| foo| 2|  0|  0|  0| 120 | 
|2017-01-01 09:00:00| blub| 2|  0|  0|  0| 120 | 
|2017-01-01 10:00:00| stop| 3|  0|  -1|  -1| 60 | 
|2017-01-01 11:00:00| null| 3|  0|  0|  0| 60 | 
|2017-01-01 12:00:00| ASC_c| 4|  0|  0|  0| 60 | 
|2017-01-01 13:00:00| stop| 5|  0|  -1|  -1| 0 | 
|2017-01-01 14:00:00| null| 3|  0|  0|  0| 0 | 
|2017-01-01 15:00:00| ASC_c| 4|  0|  0|  0| 0 | 
+-------------------+------+---+-------+-------+---------+-----+ 

レコーダーのインスタンスが1つだけで同時に実行することができたとき、私はこの問題を解決しています

.withColumn("engWindow", last($"engFlag", true).over(systemWindow.rowsBetween(Window.unboundedPreceding, 0))) 

しかし、 2つ以上のインスタンスでは、悲しいことに私はこれを達成するための手がかりがありません。 誰かが私を正しい方向に向けることができたらうれしいです。

答えて

0

私は答えを見つけたと思います。私はこの方法を複雑に考えていました。
このアプローチがうまくいかないケースがあるかどうかはまだわかりませんが、

私は作業例で行ったようにフラグを要約し、タイムスタンプ別にデータを降順に並べ替え、最小値を見つけてこの値を現在の値に追加します。これは、常に正しいレコーダーの数を示すはずです。

val ds2 = ds_notworking 
     .withColumn("started", when($"status" === "start", 1).otherwise(lit(0))) 
     .withColumn("stopped", when($"status" === "stop", -1).otherwise(lit(0))) 
     .withColumn("engFlag", when($"started" === 1, $"started").otherwise($"stopped")) 
     .withColumn("engWindow", sum($"engFlag").over(Window.orderBy($"timestamp"))) 
     .withColumn("newEngWindow", $"engWindow" - min($"engWindow").over(Window.orderBy($"timestamp".desc))) 
     .withColumn("runtime2", when($"newEngWindow" > 0, 
     (unix_timestamp(lead($"timestamp", 1).over(Window.orderBy($"timestamp"))) - unix_timestamp($"timestamp"))/60*$"newEngWindow").otherwise(lit(0))) 

EDIT:minimun値を計算してウィンドウ全体に適用する方が正しいかもしれません。

.withColumn("test1", last(min($"engWindow").over(Window.orderBy($"timestamp"))).over(Window.orderBy($"timestamp").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))) 

出力:

+-------------------+------+---+-------+-------+-------+---------+------------+--------+ 
|   timestamp|status|msg|started|stopped|engFlag|engWindow|newEngWindow|runtime2| 
+-------------------+------+---+-------+-------+-------+---------+------------+--------+ 
|2017-01-01 02:00:00| foo| 1|  0|  0|  0|  0|   2| 120.0| 
|2017-01-01 03:00:00| null| 2|  0|  0|  0|  0|   2| 120.0| 
|2017-01-01 04:00:00| stop| 1|  0|  -1|  -1|  -1|   1| 60.0| 
|2017-01-01 05:00:00| stop| 2|  0|  -1|  -1|  -2|   0|  0.0| 
|2017-01-01 06:00:00| start| 1|  1|  0|  1|  -1|   1| 60.0| 
|2017-01-01 07:00:00| start| 2|  1|  0|  1|  0|   2| 120.0| 
|2017-01-01 08:00:00| foo| 2|  0|  0|  0|  0|   2| 120.0| 
|2017-01-01 09:00:00| blub| 2|  0|  0|  0|  0|   2| 120.0| 
|2017-01-01 10:00:00| stop| 3|  0|  -1|  -1|  -1|   1| 60.0| 
|2017-01-01 11:00:00| null| 3|  0|  0|  0|  -1|   1| 60.0| 
|2017-01-01 12:00:00| ASC_c| 4|  0|  0|  0|  -1|   1| 60.0| 
|2017-01-01 13:00:00| stop| 5|  0|  -1|  -1|  -2|   0|  0.0| 
|2017-01-01 14:00:00| null| 3|  0|  0|  0|  -2|   0|  0.0| 
|2017-01-01 15:00:00| ASC_c| 4|  0|  0|  0|  -2|   0|  0.0| 
+-------------------+------+---+-------+-------+-------+---------+------------+--------+ 
関連する問題