私のPySparkコードでは、DataFrame
にセンサからのデータが入力され、各行にタイムスタンプ、event_description、event_valueがあります。 各センサーイベントは、idと値で定義される測定値で構成されます。私が持っている唯一の保証は、単一のイベントに関連するすべての「フェーズ」が2つの行(ソートされていない)の間に含まれていることです。 各イベント「ブロック」内には、EV_CODE
に関連付けられた値であるイベントラベルがあります。行の値を区切り文字で区切って火花データフレームを分割します
+-------------------------+------------+-------------+
| timestamp | event_id | event_value |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:12.540 | EV_SEP | ----- |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:14.201 | EV_2 | 10 |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:13.331 | EV_1 | 11 |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:15.203 | EV_CODE | ABC |
+-------------------------+------------+-------------+
| 2017-01-01 00:00:16.670 | EV_SEP | ----- |
+-------------------------+------------+-------------+
私はすべてのイベントがそのラベルに関連付けられていることを知っているように私は、そのラベルを含む新しい列を作成したいと思います:パンダで
+-------------------------+----------+-------------+------------+
| timestamp | event_id | event_value | event_code |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:12.540 | EV_SEP | ----- | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:14.201 | EV_2 | 10 | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:13.331 | EV_1 | 11 | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:15.203 | EV_CODE | ABC | ABC |
+-------------------------+----------+-------------+------------+
| 2017-01-01 00:00:16.670 | EV_SEP | ----- | ABC |
+-------------------------+----------+-------------+------------+
を、私は簡単のインデックスを取得することができますテーブルをブロックに分割し、各ブロックからEV_CODE
を取り込み、その値を持つevent_code
列を作成します。
可能な解決策は、次のようになりますタイムスタンプ
- ソートstart_index、end_index)
- EV_CODEを抽出する単一のチャンク(インデックスでフィルタリング)を処理する
- は最終的に必要な列を作成します
この問題を解決する方法はありますか?