spark-sqlソリューションのpysparkは、このようになります。
まずサンプルデータセットを作成します。データセットに加えて、グループおよび行のrow_numberフィールドパーティションをタイムスタンプで生成します。その後、我々はテーブルとして生成されたデータフレームを登録データフレームをテーブルとして登録されると(table1
)table1
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import unix_timestamp
df = spark.createDataFrame([
('2017-01-01 02:03:11','healthy','000001'),
('2017-01-01 02:03:04','healthy','000001'),
('2017-01-01 02:03:03','unhealthy','000001'),
('2017-01-01 02:03:00','unhealthy','000001'),
('2017-01-01 02:02:58','healthy','000008'),
('2017-01-01 02:02:57','healthy','000008'),
('2017-01-01 02:02:55','unhealthy','000001'),
('2017-01-01 02:02:54','healthy','000001'),
('2017-01-01 02:02:50','healthy','000007'),
('2017-01-01 02:02:48','healthy','000004')
],['timestamp','state','group_id'])
df = df.withColumn('rownum', row_number().over(Window.partitionBy(df.group_id).orderBy(unix_timestamp(df.timestamp))))
df.registerTempTable("table1")
を言います。必要なデータは、サンプルdatesetのみGROUP_ID 00001
ため不健康なデータを持っていた火花SQL
>>> spark.sql("""
... SELECT t1.group_id,sum((t2.timestamp_value - t1.timestamp_value)) as duration
... FROM
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1 WHERE state = 'unhealthy') t1
... LEFT JOIN
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1) t2
... ON t1.group_id = t2.group_id
... AND t1.rownum = t2.rownum - 1
... group by t1.group_id
... """).show()
+--------+--------+
|group_id|duration|
+--------+--------+
| 000001| 9|
+--------+--------+
を用いて以下のように計算することができます。この解決策は、健康でない状態の他のgroup_idsの場合に有効です。
掲載されたソリューションのいずれかが機能しましたか? –
@ rogue-one - ありがとう、あなたの実装は非常によく説明され、詳細でした!ステップを歩いて、それは非常に華麗で賢いようです。 これを拡張すると、グループが不健全な状態になっている各期間の長さを取得する方法がありますか(たとえば、グループ000001に複数の不健全な状態がある場合は、それぞれの長さを返します)。 –
't2.timestamp_value - t1を実行した場合。timestamp_value'を私のクエリで合計とグループ化せずにすると、期間を取得できるはずです。 –