3

Pysparkの分類子の入力データを準備しています。 SparkSQLで集合関数を使用して平均や分散などの特徴を抽出しています。これらは、アクティビティ、名前、ウィンドウでグループ化されています。ウィンドウは、UNIXのタイムスタンプを10000で割って10秒の時間ウィンドウに分割して計算されています。Pysparkユーザ定義集計計算カラム

sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window") 

この結果は、私が今、何をしたいか

Activity Name   Window  AvgX  VarX 
Walk accelerometer 95875  2.0   1.0 

ようになり、私はタイムスタンプを必要とする。これにX.

に各点の平均勾配を計算することで、私はPythonで配列を使ってロジックを実装しました。これは、各点の間の勾配を計算し、平均勾配を得るようなものです。理想的には、PysparkでまだサポートされていないUDAFでこれを行いたいと思います。 (それは次のようになり、その後、SQLにあなたが​​

EDITを行うことができ、以下の機能が坂と呼ばれていた場合と言う - 。それが明確になるように入力を変更し だから、私は正確にとの間の傾きを計算しているやっているものを。その後、各点、そしてそのウィンドウで斜面の平均を返します。私は、各ウィンドウの平均と分散を取得していますように、私はまた、平均勾配を取得したい。

#sample input 
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529] 

values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02] 

i = 0; 
slope = 0.0; 
totalSlope = 0.0; 

while (i < len(timestamp) - 1): 
    y2 = values[i+1]; 
    y1 = values[i]; 

    x2 = timestamp[i + 1]; 
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope; 
    i=i+1 

avgSlope = (totalSlope/len(x_values)) 

どのように私はこれを実装することができます私はパンダのデータフレームに変換しようとするといいでしょうか?もしそうなら、どうすればデータが正しくマッピングされているかを確認し、GROUP BYの動作SQLクエリの名前ウィンドウ。

+0

これは間違いなくUDAFの仕事ではありません。 – zero323

+0

@ zero323これにどうやってアプローチしますか? – other15

+0

連続する点の勾配を計算し、次に単純な平均を取る。しかしここでの入力の説明はむしろあいまいです。期待される出力のサンプルデータを投稿できますか? – zero323

答えて

4

一般に、UDAFは注文を定義する手段を提供していないため、これはUDAFの仕事ではありません。ここに本当に必要なのは、ウィンドウ関数と標準集計の組み合わせです。

from pyspark.sql.functions import col, lag, avg 
from pyspark.sql.window import Window 

df = ... 
## DataFrame[activity: string, name: string, window: bigint, 
## timestamp: bigint, value: float] 

group = ["activity", "name", "window"] 

w = (Window() 
    .partitionBy(*group) 
    .orderBy("timestamp")) 

v_diff = col("value") - lag("value", 1).over(w) 
t_diff = col("timestamp") - lag("timestamp", 1).over(w) 

slope = v_diff/t_diff 

df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope"))) 
+0

これは良いアプローチのようですが、withColumnは "傾き"を無視して、代わりに平均の値を返します。関数を使用することができますか? – other15

+0

Typo。これは、勾配in agg節でなければなりません。 – zero323