2016-01-22 9 views
7

SQLServerテーブルからDataFrameをロードしました。それは次のようになります。スパークデータフレームを日付でグループ化する

>>> df.show() 
+--------------------+----------+ 
|   timestamp| Value | 
+--------------------+----------+ 
|2015-12-02 00:10:...|  652.8| 
|2015-12-02 00:20:...|  518.4| 
|2015-12-02 00:30:...|  524.6| 
|2015-12-02 00:40:...|  382.9| 
|2015-12-02 00:50:...|  461.6| 
|2015-12-02 01:00:...|  476.6| 
|2015-12-02 01:10:...|  472.6| 
|2015-12-02 01:20:...|  353.0| 
|2015-12-02 01:30:...|  407.9| 
|2015-12-02 01:40:...|  475.9| 
|2015-12-02 01:50:...|  513.2| 
|2015-12-02 02:00:...|  569.0| 
|2015-12-02 02:10:...|  711.4| 
|2015-12-02 02:20:...|  457.6| 
|2015-12-02 02:30:...|  392.0| 
|2015-12-02 02:40:...|  459.5| 
|2015-12-02 02:50:...|  560.2| 
|2015-12-02 03:00:...|  252.9| 
|2015-12-02 03:10:...|  228.7| 
|2015-12-02 03:20:...|  312.2| 
+--------------------+----------+ 

今、私は時間(または日、または月か...)でグループ(和)の値したいと​​思いますが、私は実際にどのようにすることができますについての手掛かりを持っていません私がする。

これがDataFrameをロードする方法です。しかし、これは正しい方法ではないと感じています。

query = """ 
SELECT column1 AS timestamp, column2 AS value 
FROM table 
WHERE blahblah 
""" 

sc = SparkContext("local", 'test') 
sqlctx = SQLContext(sc) 

df = sqlctx.load(source="jdbc", 
       url="jdbc:sqlserver://<CONNECTION_DATA>", 
       dbtable="(%s) AS alias" % query) 

大丈夫ですか? 1.5.0スパーク日付とタイムスタンプに動作可能dayofmonthhourmonth又はyearなど多くの機能を提供するので

答えて

11

。したがって、timestampTimestampTypeの場合は、正しい表現が必要です。たとえば、次のように

from pyspark.sql.functions import hour, mean 

(df 
    .groupBy(hour("timestamp").alias("hour")) 
    .agg(mean("value").alias("mean")) 
    .show()) 

## +----+------------------+ 
## |hour|    mean| 
## +----+------------------+ 
## | 0|508.05999999999995| 
## | 1| 449.8666666666666| 
## | 2| 524.9499999999999| 
## | 3|264.59999999999997| 
## +----+------------------+ 

プリ1.5.0あなたの最良のオプションはselectExprのいずれかHiveContextとハイブUDFを使用することです:

df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum() 

## +----+---------+----------+ 
## |year|SUM(year)|SUM(value)| 
## +----+---------+----------+ 
## |2015| 40300| 9183.0| 
## +----+---------+----------+ 

または生のSQL:

df.registerTempTable("df") 

sqlContext.sql(""" 
    SELECT MONTH(timestamp) AS month, SUM(value) AS values_sum 
    FROM df 
    GROUP BY MONTH(timestamp)""") 

はちょうどそれを覚えています外部ソースにプッシュダウンされないSparkによって集約が実行されます。通常は望ましい動作ですが、データ転送を制限するために集約をサブクエリとして実行したい場合があります。

+0

は、彼らのすべての機能monthofyearですか? –

0

また、date_formatを使用して任意の期間を作成することができます。 GROUPBY特定の日:

from pyspark.sql import functions as F 
 

 
df.select(F.date_format('timestamp','yyyy-MM-dd').alias('day')).groupby('day').count().show()

GROUPBY特定の月(ちょうどフォーマットを変更):

df.select(F.date_format('timestamp','yyyy-MM').alias('month')).groupby('month').count().show()

関連する問題