Pysparkの分類子の入力データを準備しています。 SparkSQLで集合関数を使用して平均や分散などの特徴を抽出しています。これらは、アクティビティ、名前、ウィンドウでグループ化されています。ウィンドウは、UNIXのタイムスタンプを10000で割って10秒の時間ウィンドウに分割して計算されています。Pysparkユーザ定義集計計算カラム
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
この結果は、私が今、何をしたいか
Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
ようになり、私はタイムスタンプを必要とする。これにX.
に各点の平均勾配を計算することで、私はPythonで配列を使ってロジックを実装しました。これは、各点の間の勾配を計算し、平均勾配を得るようなものです。理想的には、PysparkでまだサポートされていないUDAFでこれを行いたいと思います。 (それは次のようになり、その後、SQLにあなたが
EDITを行うことができ、以下の機能が坂と呼ばれていた場合と言う - 。それが明確になるように入力を変更し だから、私は正確にとの間の傾きを計算しているやっているものを。その後、各点、そしてそのウィンドウで斜面の平均を返します。私は、各ウィンドウの平均と分散を取得していますように、私はまた、平均勾配を取得したい。
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
どのように私はこれを実装することができます私はパンダのデータフレームに変換しようとするといいでしょうか?もしそうなら、どうすればデータが正しくマッピングされているかを確認し、GROUP BYの動作SQLクエリの名前ウィンドウ。
これは間違いなくUDAFの仕事ではありません。 – zero323
@ zero323これにどうやってアプローチしますか? – other15
連続する点の勾配を計算し、次に単純な平均を取る。しかしここでの入力の説明はむしろあいまいです。期待される出力のサンプルデータを投稿できますか? – zero323