2017-10-23 5 views
2

私のPySparkコードでは、DataFrameにセンサからのデータが入力され、各行にタイムスタンプ、event_description、event_valueがあります。 各センサーイベントは、idと値で定義される測定値で構成されます。私が持っている唯一の保証は、単一のイベントに関連するすべての「フェーズ」が2つの行(ソートされていない)の間に含まれていることです。 各イベント「ブロック」内には、EV_CODEに関連付けられた値であるイベントラベルがあります。行の値を区切り文字で区切って火花データフレームを分割します

+-------------------------+------------+-------------+ 
| timestamp    | event_id | event_value | 
+-------------------------+------------+-------------+ 
| 2017-01-01 00:00:12.540 | EV_SEP  | -----  | 
+-------------------------+------------+-------------+ 
| 2017-01-01 00:00:14.201 | EV_2  | 10   | 
+-------------------------+------------+-------------+ 
| 2017-01-01 00:00:13.331 | EV_1  | 11   | 
+-------------------------+------------+-------------+ 
| 2017-01-01 00:00:15.203 | EV_CODE | ABC   | 
+-------------------------+------------+-------------+ 
| 2017-01-01 00:00:16.670 | EV_SEP  | -----  | 
+-------------------------+------------+-------------+ 

私はすべてのイベントがそのラベルに関連付けられていることを知っているように私は、そのラベルを含む新しい列を作成したいと思います:パンダで

+-------------------------+----------+-------------+------------+ 
| timestamp    | event_id | event_value | event_code | 
+-------------------------+----------+-------------+------------+ 
| 2017-01-01 00:00:12.540 | EV_SEP | -----  | ABC  | 
+-------------------------+----------+-------------+------------+ 
| 2017-01-01 00:00:14.201 | EV_2  | 10   | ABC  | 
+-------------------------+----------+-------------+------------+ 
| 2017-01-01 00:00:13.331 | EV_1  | 11   | ABC  | 
+-------------------------+----------+-------------+------------+ 
| 2017-01-01 00:00:15.203 | EV_CODE | ABC   | ABC  | 
+-------------------------+----------+-------------+------------+ 
| 2017-01-01 00:00:16.670 | EV_SEP | -----  | ABC  | 
+-------------------------+----------+-------------+------------+ 

を、私は簡単のインデックスを取得することができますテーブルをブロックに分割し、各ブロックからEV_CODEを取り込み、その値を持つevent_code列を作成します。

可能な解決策は、次のようになりますタイムスタンプ

  • に係るデータフレームがRDDにデータフレームを変換しEV_SEP
  • を含むインデックスが(ブロック範囲を計算取得
  • zipWithIndexを呼び出す

    • ソートstart_index、end_index)
    • EV_CODEを抽出する単一のチャンク(インデックスでフィルタリング)を処理する
    • は最終的に必要な列を作成します

    この問題を解決する方法はありますか?

  • 答えて

    1
    from pyspark.sql import functions as f 
    

    サンプルデータ:

    df_idx = df.filter(df['event_id'] == 'EV_SEP') \ 
        .withColumn('idx', f.row_number().over(Window.partitionBy().orderBy(df['timestamp']))) 
    df_block = df.filter(df['event_id'] != 'EV_SEP').withColumn('idx', f.lit(0)) 
    

    'スプレッド' インデックス:

    EV_CODEを追加します。

    df.show() 
    
    +-----------------------+--------+-----------+ 
    |timestamp    |event_id|event_value| 
    +-----------------------+--------+-----------+ 
    |2017-01-01 00:00:12.540|EV_SEP |null  | 
    |2017-01-01 00:00:14.201|EV_2 |10   | 
    |2017-01-01 00:00:13.331|EV_1 |11   | 
    |2017-01-01 00:00:15.203|EV_CODE |ABC  | 
    |2017-01-01 00:00:16.670|EV_SEP |null  | 
    |2017-01-01 00:00:20.201|EV_2 |10   | 
    |2017-01-01 00:00:24.203|EV_CODE |DEF  | 
    |2017-01-01 00:00:31.670|EV_SEP |null  | 
    +-----------------------+--------+-----------+ 
    

    インデックスが追加

    01最後に
    df_code = df.filter(df['event_id'] == 'EV_CODE').withColumnRenamed('event_value', 'event_code') 
    df = df.join(df_code, on=[df['idx'] == df_code['idx']]) \ 
        .select(df['timestamp'], df['event_id'], df['event_value'], df_code['event_code']) 
    

    +-----------------------+--------+-----------+----------+ 
    |timestamp    |event_id|event_value|event_code| 
    +-----------------------+--------+-----------+----------+ 
    |2017-01-01 00:00:12.540|EV_SEP |null  |ABC  | 
    |2017-01-01 00:00:13.331|EV_1 |11   |ABC  | 
    |2017-01-01 00:00:14.201|EV_2 |10   |ABC  | 
    |2017-01-01 00:00:15.203|EV_CODE |ABC  |ABC  | 
    |2017-01-01 00:00:16.670|EV_SEP |null  |DEF  | 
    |2017-01-01 00:00:20.201|EV_2 |10   |DEF  | 
    |2017-01-01 00:00:24.203|EV_CODE |DEF  |DEF  | 
    +-----------------------+--------+-----------+----------+ 
    
    0

    新しいHadoop InputFormatを作成することで、ここで目標を達成するために計算上効率的な方法になります(ただし、コード上では同じまたはより多くの体操です)。 the Python APIsc.hadoopFileを使用して、代わりのHadoop入力フォーマットを指定することはできますが、JavaフォーマットからPythonへの変換を行う必要があります。フォーマットを指定することができます。 PySparkで利用できるコンバーターは比較的少ないですが、this referenceAvro converter as an exampleを使用することを提案しています。また、カスタムHadoop入力フォーマットの出力テキストを出力して、Pythonでさらに解析してコンバーターを実装する問題を回避すると便利かもしれません。

    これを実行したら、改行文字の代わりにEV_SEPの特別な行を特殊文字(JavaまたはScalaでHadoop APIを使用)で作成してレコード区切り文字として扱います。アキュムレータで読み取った行を集めることで(単にArrayListが概念実証として可能)、2つの行が連続して見つかったときにレコードの累積リストを出力することで、これを非常に簡単に行うことができます。

    このような設計の基礎としてTextInputFormatを使用すると魅力的かもしれませんが、入力形式ではこれらのファイルを改行文字で任意に分割し、ファイルの分割を適切にサポートするカスタムロジックを実装する必要があります。あるいは、ファイル分割を実装しないだけで問題を回避することもできます。これはパーティショナーの簡単な変更です。あなたはファイルを分割する必要がある場合

    、基本的な考え方は次のとおりです。

    • 均等部分にファイルを分割することによりオフセットスプリットを選ぶ
    • オフセット
    • へシーク文字バイバックシーク列に2列タイプEV_SEPと、この場合、(デリミタシーケンスが見つかった場合にオフセットから文字。

    Fiの周りのエッジケースのためにこれらの配列を検出します分裂が課題になります。私は行の最大のバイト幅を確立し、適切な幅(基本的に2倍の行のサイズ)のスライディングウィンドウチャンクを出発点から逆方向に読み込み、プリコンパイルされたJava正規表現とMatcherを使用してそれらのウィンドウと照合することをお勧めします。これはSequence Files find their sync marksと似ていますが、厳密な等価性ではなく正規表現を使用して検出します。

    タイムスタンプでDataFrameを並べ替えると、同じ時間帯に発生するイベントの内容が異なるファイルで変更される可能性があります。

    関連する問題