0

ログを含むPysparkデータフレームがあり、各行はログに記録された時点のシステムの状態に対応しています。私は各グループが不健康な状態にある期間の長さを見たいと思います。例えば特定のステータス(列の値)を持つ行に対する期間の分布

これが私のテーブルだった場合、:

TIMESTAMP | STATUS_CODE | GROUP_NUMBER 
-------------------------------------- 
02:03:11 | healthy  | 000001 
02:03:04 | healthy  | 000001 
02:03:03 | unhealthy | 000001 
02:03:00 | unhealthy | 000001 
02:02:58 | healthy  | 000008 
02:02:57 | healthy  | 000008 
02:02:55 | unhealthy | 000001 
02:02:54 | healthy  | 000001 
02:02:50 | healthy  | 000007 
02:02:48 | healthy  | 000004 

私は(2時02分55秒から2時03分04秒まで)9秒の不健康な期間を持つグループ000001を返すようにしたいでしょう。

他のグループもまた不健全な期間を持つ可能性があります。また、これらのグループも返信したいと考えています。

同じステータスの連続する行の可能性と異なるグループの行が散在しているため、これを効率的に実行する方法を見つけるのには苦労しています。

PysparkデータフレームをPandasデータフレームに変換することはできません。

これらの期間の長さを効率的に決定するにはどうすればよいですか?

ありがとうございます!

+0

掲載されたソリューションのいずれかが機能しましたか? –

+0

@ rogue-one - ありがとう、あなたの実装は非常によく説明され、詳細でした!ステップを歩いて、それは非常に華麗で賢いようです。 これを拡張すると、グループが不健全な状態になっている各期間の長さを取得する方法がありますか(たとえば、グループ000001に複数の不健全な状態がある場合は、それぞれの長さを返します)。 –

+0

't2.timestamp_value - t1を実行した場合。timestamp_value'を私のクエリで合計とグループ化せずにすると、期間を取得できるはずです。 –

答えて

0

spark-sqlソリューションのpysparkは、このようになります。

まずサンプルデータセットを作成します。データセットに加えて、グループおよび行のrow_numberフィールドパーティションをタイムスタンプで生成します。その後、我々はテーブルとして生成されたデータフレームを登録データフレームをテーブルとして登録されると(table1table1

from pyspark.sql.window import Window 
from pyspark.sql.functions import row_number 
from pyspark.sql.functions import unix_timestamp 

df = spark.createDataFrame([ 
('2017-01-01 02:03:11','healthy','000001'), 
('2017-01-01 02:03:04','healthy','000001'), 
('2017-01-01 02:03:03','unhealthy','000001'), 
('2017-01-01 02:03:00','unhealthy','000001'), 
('2017-01-01 02:02:58','healthy','000008'), 
('2017-01-01 02:02:57','healthy','000008'), 
('2017-01-01 02:02:55','unhealthy','000001'), 
('2017-01-01 02:02:54','healthy','000001'), 
('2017-01-01 02:02:50','healthy','000007'), 
('2017-01-01 02:02:48','healthy','000004') 
],['timestamp','state','group_id']) 

df = df.withColumn('rownum', row_number().over(Window.partitionBy(df.group_id).orderBy(unix_timestamp(df.timestamp)))) 

df.registerTempTable("table1") 

を言います。必要なデータは、サンプルdatesetのみGROUP_ID 00001ため不健康なデータを持っていた火花SQL

>>> spark.sql(""" 
... SELECT t1.group_id,sum((t2.timestamp_value - t1.timestamp_value)) as duration 
... FROM 
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1 WHERE state = 'unhealthy') t1 
... LEFT JOIN 
... (SELECT unix_timestamp(timestamp) as timestamp_value,group_id,rownum FROM table1) t2 
... ON t1.group_id = t2.group_id 
... AND t1.rownum = t2.rownum - 1 
... group by t1.group_id 
... """).show() 
+--------+--------+ 
|group_id|duration| 
+--------+--------+ 
| 000001|  9| 
+--------+--------+ 

を用いて以下のように計算することができます。この解決策は、健康でない状態の他のgroup_idsの場合に有効です。

0

一つの簡単な方法は、(最適ではないかもしれない)である。

  1. マップは、同じパーティション内のすべての単一のグループのすべてのデータを持つことになりますので、鍵K
  2. 使用repartitionAndSortWithinPartitionsとしてGROUP_NUMBERと[K,V]TIMESTAMPでソートしてください。詳細な説明はこの回答にあります。Pyspark: Using repartitionAndSortWithinPartitions with multiple sort Critiria
  3. 最後にmapPartitionsを使用して、ソートされたデータを1つのパーティションにまたがってイテレータを取得するので、必要な答えを簡単に見つけることができます。 (mapPartitionsの説明:How does the pyspark mapPartitions function work?
関連する問題