2017-08-12 20 views
1

の火付け役、私はフォーマットの入力データフレーム計算合計、複数のトップK値のカウント

+---------------------------------+ 
|name| values |score |row_number| 
+---------------------------------+ 
|A |1000 |0  |1  | 
|B |947 |0  |2  | 
|C |923 |1  |3  | 
|D |900 |2  |4  | 
|E |850 |3  |5  | 
|F |800 |1  |6  | 
+---------------------------------+ 

を持っていますデータフレームの上位k個の値がscore> 0の場合はすべての値のうちの1つを返します。

私はしかし、私は2500 ......トップ100200300のためのデータをフェッチする必要がトップ100の値

val top_100_data = df.select(
     count(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("count_100"), 
     sum(when(col("score") > 0 and col("row_number")<=100, col("values"))).alias("sum_filtered_100"), 
     sum(when(col("row_number") <=100, col(values))).alias("total_sum_100") 
    ) 

ための次のクエリを実行することによって、これを達成することができています。このクエリを25回実行し、最後に25のデータフレームを実行する必要があることを意味します。

私はスパークし、まだ多くのことを考え出して新しくなりました。この問題を解決する最良の方法は何でしょうか?

ありがとうございます!

答えて

1

あなたは次にあなたがtopFilters配列をループして、必要なdataframeを作成することができます

val topFilters = Array(100, 200, 300) // you can add more 

としての限界のArrayを作成することができます。 joinの代わりにunionを使用することをお勧めします。joinは、別途columnsunionsとなり、別途rowsとなります。あなたのdataframeあなたはあなたの最終dataframe

+------+-------------+-------------+-------------+ |rowNum|total_sum_100|total_sum_200|total_sum_300| +------+-------------+-------------+-------------+ |1 |923 |1773 |3473 | +------+-------------+-------------+-------------+ 

などを与える必要がありますどの

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 
var finalDF : DataFrame = Seq("1").toDF("rowNum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit("1").as("rowNum"), sum(when(col("score") > 0 && col("row_number") < k, col("values"))).alias(s"total_sum_$k")) 
    finalDF = finalDF.join(top_100_data, Seq("rowNum")) 
} 
finalDF.show(false) 

上記のように定義topFilters配列を使用して行うことができます

+----+------+-----+----------+ 
|name|values|score|row_number| 
+----+------+-----+----------+ 
|A |1000 |0 |1   | 
|B |947 |0 |2   | 
|C |923 |1 |3   | 
|D |900 |2 |200  | 
|E |850 |3 |150  | 
|F |800 |1 |250  | 
+----+------+-----+----------+ 

として考えると、次の

を行うことができます

あなたあなたが持っているあなたの25の限界のために同じことをすることができます。

unionを使用する場合は、上記の考え方に似ています。

私の答えは、あなたが労働組合を必要とする場合、あなたはあなたの

を与える必要があります

var finalDF : DataFrame = Seq((0, 0, 0, 0)).toDF("limit", "count", "sum_filtered", "total_sum") 
for(k <- topFilters) { 
    val top_100_data = df.select(lit(k).as("limit"), count(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("count"), 
    sum(when(col("score") > 0 and col("row_number")<=k, col("values"))).alias("sum_filtered"), 
    sum(when(col("row_number") <=k, col("values"))).alias("total_sum")) 
    finalDF = finalDF.union(top_100_data) 
} 
finalDF.filter(col("limit") =!= 0).show(false) 

上で定義された同じ制限アレイと、次のロジックを適用することができ

を更新役立つ

であると思います

+-----+-----+------------+---------+ |limit|count|sum_filtered|total_sum| +-----+-----+------------+---------+ |100 |1 |923 |2870 | |200 |3 |2673 |4620 | |300 |4 |3473 |5420 | +-----+-----+------------+---------+ 
+0

こんにちは!答えてくれてありがとう、これはとても役に立ちました!!だから私はすべてのK(sum_100_filtered_score、total_sum_100、count_filtered_score_100)の3つの列が必要になります。データセットに参加することで、各フィールドに1つの列が得られます。だから私は組合を使ってみようとしているのです – Vignesh

+0

それでは。 :)結合の代わりに、あなたは共用体を使うことができます。 –

+0

私の更新された答えを見てください:)あなたが本当にあなたを助けた場合、あなたは受け入れてupvoteすることができます –

関連する問題