2016-11-22 7 views
0

私は、などのコメントは合計を展開していスパーク内の値の減少を特定する(外れ値)は

Movie Likes Comments Shares Views 
A  100  10  20  30 
A  102  11  22  35 
A  104  12  25  45 
A  *103* 13  *24* 50 
B  200  10  20  30 
B  205 *9*  21  35 
B  *203* 12  29  42 
B  210  13  *23* *39* 

好きなもののようなものがある数百万のレコードで設定された大規模なデータを持っており、それらが増加すると仮定されています。映画のためにこれに何かが落ちるなら、それは悪いデータが特定される必要があります。

私はgroupby映画についての最初の考えを持っていて、グループ内でソートします。私はスパーク1.6のデータフレームを処理に使用していますが、データフレーム内のグループ化されたデータ内にソートがないため、達成できないようです。

外れ値検出のための何かが別のアプローチかもしれませんが、時間制約のために私はまだそれを調べていません。

私はこれを達成できますか?

ありがとうございます!

答えて

1

あなたはスコープに以前の値をもたらすためにラグウィンドウ関数を使用することができます。

import org.apache.spark.sql.expressions.Window 
val windowSpec = Window.partitionBy('Movie).orderBy('maybesometemporalfield) 
dataset.withColumn("lag_likes", lag('Likes, 1) over windowSpec) 
     .withColumn("lag_comments", lag('Comments, 1) over windowSpec) 
     .show 

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-functions.html#lag

別のアプローチは、行番号を割り当てることであろう(すでに存在しない場合)、その列を遅らせてから、前の行に行を結合して、比較を実行できるようにします。

HTH

関連する問題