2017-07-01 3 views
0

私は以下のような構造のcsvファイルを持っています。大規模なデータセットの時間間隔に渡って平均価格を得る

INDEX,SYMBOL,DATETIMETS,PRICE,SIZE 
0,A,2002-12-02 9:30:20,19.75,30200 
1,A,2002-12-02 9:30:22,19.75,100 
2,A,2002-12-02 9:30:22,19.75,300 
3,A,2002-12-02 9:30:22,19.75,100 
4,A,2002-12-02 9:30:23,19.75,100 
5,A,2002-12-02 9:30:23,19.75,100 
6,A,2002-12-02 9:30:23,19.75,100 
7,A,2002-12-02 9:30:23,19.75,100 
....... 
....... 

何年にもわたる100万行以上があります。 私はこのcsvファイルをスパークデータフレーム(pyspark)にロードしました。 5分間隔で価格の平均を得るための最も簡単な方法は何ですか?

私が現在行っていることは、データセット全体をループし、5分間隔で時間を照会することです。例:

filteredSqlString = ("SELECT PRICE FROM DF WHERE DATETIMETS >= '" + str(sdt) + "'" 
         + " AND DATETIMETS < '" + str(idt) + "'") 
filtered_df = sqlContext.sql(filteredSqlString); 
MEAN_PRICE = filtered_df.select([mean("PRICE")]).first()[0]; 

と、このアプローチは永遠に取って、開始日時と終了日時

sdt = idt; 
idt = sdt + timedelta(minutes=5); 

をインクリメントすることによりループでこれを実行しています。これを達成するためのより速い方法がありましたか?

答えて

1

私はこれがずっと良い解決策であるべきだと思います。

schema = StructType([ 
    StructField("INDEX", IntegerType(), True), 
    StructField("SYMBOL", StringType(), True), 
    StructField("DATETIMETS", StringType(), True), 
    StructField("PRICE", DoubleType(), True), 
    StructField("SIZE", IntegerType(), True), 
]) 

df = spark\ 
    .createDataFrame(
     data=[(0,'A','2002-12-02 9:30:20',19.75,30200), 
      (1,'A','2002-12-02 9:31:20',19.75,30200), 
      (2,'A','2002-12-02 9:35:20',19.75,30200), 
      (3,'A','2002-12-02 9:36:20',1.0,30200), 
      (4,'A','2002-12-02 9:41:20',20.0,30200), 
      (4,'A','2002-12-02 9:42:20',40.0,30200), 
      (5,'A','2003-12-02 11:28:20',19.75,30200), 
      (6,'A','2003-12-02 11:31:20',19.75,30200), 
      (7,'A','2003-12-02 12:35:20',19.75,30200), 
      (8,'A','2004-12-02 10:36:20',1.0,30200), 
      (9,'A','2006-12-02 22:41:20',20.0,30200), 
      (10,'A','2006-12-02 22:42:20',40.0,30200)], 
     schema=schema) 

は、私たちの関心間隔を作成してみましょう:

ある
intervals = [] 
for i in range(0,61,5): 
    intervals.append(i) 
print(intervals) 

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60] 

その後いくつかのUDFの私たちは、グループ化のために必要となる:

いくつかの入力を考えると

u_get_year = udf(lambda col : col[:10]) 
u_get_hour = udf(lambda col : col.strip().split(" ")[1].split(':')[0], StringType()) 

def get_interval(col): 
    curr = int(col.strip().split(" ")[1].split(':')[1]) 

    for idx,interval in enumerate(intervals): 
     if intervals[idx] <= curr < intervals[idx+1]: 
      return "{}-{}".format(intervals[idx],intervals[idx+1]) 

    return "" 

u_get_interval = udf(get_interval, StringType()) 

最後の操作を実行してみましょう:

df2 = df.withColumn('DATE',u_get_year('DATETIMETS'))\ 
     .withColumn('HOUR', u_get_hour('DATETIMETS'))\ 
     .withColumn('INTERVAL', u_get_interval('DATETIMETS'))\ 
     .drop('DATETIMETS') 

df2.groupBy('DATE', 'HOUR', 'INTERVAL').agg(mean('PRICE'))\ 
     .orderBy('DATE', 'HOUR', 'INTERVAL').show() 

出力:ご返信用

+----------+----+--------+----------+ 
|DATE  |HOUR|INTERVAL|avg(PRICE)| 
+----------+----+--------+----------+ 
|2002-12-02|9 |30-35 |19.75  | 
|2002-12-02|9 |35-40 |10.375 | 
|2002-12-02|9 |40-45 |30.0  | 
|2003-12-02|11 |25-30 |19.75  | 
|2003-12-02|11 |30-35 |19.75  | 
|2003-12-02|12 |35-40 |19.75  | 
|2004-12-02|10 |35-40 |1.0  | 
|2006-12-02|22 |40-45 |30.0  | 
+----------+----+--------+----------+ 
+0

感謝。 – Bookamp

関連する問題