2016-08-04 8 views
1

タイムスタンプ、アセット(文字列)、タグ(文字列)、および値(ダブル)の列を持つスパークデータフレーム(スカラインターフェイスを使用)があります。ここでの抜粋です:フィルタスパークリスト内の任意の点集合間のデータフレーム

+--------------------+-----+--------+-------------------+ 
|   timestamp|asset|  tag|    value| 
+--------------------+-----+--------+-------------------+ 
|2013-01-03 23:36:...| G4| BTGJ2_2|  116.985626221| 
|2013-01-15 00:36:...| G4| TTXD1_6|  66.887382507| 
|2013-01-05 13:03:...| G4|TTXD1_22|  40.913497925| 
|2013-01-12 04:43:...| G4|TTXD1_23|  60.834510803| 
|2013-01-08 17:54:...| G4| LTB1D|  106.534744263| 
|2013-01-02 04:15:...| G4| WEXH|  255.981292725| 
|2013-01-07 10:54:...| G4| BTTA1_7|  100.743843079| 
|2013-01-05 11:29:...| G4| CDFH_10|  388.560668945| 
|2013-01-10 09:10:...| G4| LTB1D|  112.226242065| 
|2013-01-13 15:09:...| G4|TTXD1_15|  63.970848083| 
|2013-01-15 01:23:...| G4| TTIB|  67.993904114| 

私はまた、各Listはサイズ2であり、関心の間隔で開始時間と終了を保持Array[List[Timestamp]]を、持っています。例:2013年1月12日に6時00分に2013年1月2日の12:00まで深夜から1、深夜2013年1月10日から別の

event_times: Array[List[java.sql.Timestamp]] = Array(List(2013-01-02 00:00:00.0, 2013-01-02 12:00:00.0), List(2013-01-10 00:00:00.0, 2013-01-12 06:00:00.0)) 

は、関心のある2つの間隔を保持しています

ここに私の質問です:タイムスタンプがの間隔のいずれかのにあるような値を返すようにデータフレームをフィルタリングするにはどうすればよいですか?いずれかの区間については、私は

df.filter(df("timestamp").between(start, end)) 

を行うことができ、私は(私が持っているどのように多くの間隔)Arrayしているどのように多くの要素がわからないので、私は、フィルタの長いシリーズを持つことはできません。上記の例の場合

、私は私が今持っていることはArrayをループであり、各1のための適切なサブセットを取得しています行4、6を維持したい、と9

でしょう。しかし、それはおそらくすべてが大きなフィルタの権利にあるよりも遅いですか?

+1

申し訳ありませんが、私はあなたが欲しいものを得ることができませんでした。あなたは明確な例を挙げることができますか? –

+0

@ThiagoBaldim私は例をもって更新しました。 – kgully

答えて

3

タイムスタンプリストをDataFrameに変換し、対応するタイムスタンプで初期DataFrameと結合することができます。このプロセスを説明するための簡単な例を作成しました。

//Dummy data 
val data = List(
    ("2013-01-02 00:30:00.0", "116.985626221"), 
    ("2013-01-03 00:30:00.0", "66.887382507"), 
    ("2013-01-11 00:30:00.0", "12.3456") 
) 

//Convert data to DataFrame 
val dfData = sc.parallelize(data).toDF("timestamp", "value") 

//Timestamp intervals list 
val filterList = Array(
    List("2013-01-02 00:00:00.0", "2013-01-02 12:00:00.0"), 
    List("2013-01-10 00:00:00.0", "2013-01-12 06:00:00.0") 
) 

//Convert the intervals list to a DataFrame 
val dfIntervals = sc.parallelize(
    filterList.map(l => (l(0),l(1))) 
).toDF("start_ts","end_ts") 

//Join both dataframes (inner join, since you only want matching rows) 
val joined = dfData.as("data").join(
    dfIntervals.as("inter"), 
    $"data.timestamp".between($"inter.start_ts", $"inter.end_ts") 
) 
関連する問題