2017-04-14 3 views
0

データセットの上位n項目を取得しようとしています。スパークウィンドウ関数上位N項目のパフォーマンスの問題

最初はこれを行いました。

var df = Seq((1 , "row1") , (2,"row2"), (1,"row11") , (1 , null)).toDF() 

df=df.select($"_1".alias("p_int"), $"_2".alias("p_string")) 

val resultDf =df.where($"p_string".isNotNull).select($"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "ROWNUMBER").where($"ROWNUMBER" <= 2) 

しかし、私はだから私は、しかし、次の

var df = Seq((1 , "row1") , (2,"row2"), (1,"row11") , (1 , null)).toDF() 

df=df.select($"_1".alias("p_int"), $"_2".alias("p_string")) 

val test =df.where($"p_string".isNotNull).select($"p_int" ,$"p_int" +1 , upper($"p_string") , rank().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "RANKINDEX", row_number().over(Window.partitionBy($"p_int").orderBy($"p_string")) as "ROWNUMBER") 

implicit val encoder = RowEncoder(test.schema) 

var temp =test.mapPartitions(_.take(2)) 

を行うことを決めた "($" ROWNUMBER "< = 10)"

操作のパフォーマンスコストを回避したいです、私のテストは、これが正しい出力を生成しないことを示すようです。

どのような考えがありますか。ウィンドウデータセットから取得したイテレータのtake関数がイテレータの最初のn個の要素を取得しないのでしょうか?

答えて

0

Datasetのパーティションは、PARTITION BYと一対一で対応していません。 OVER (PARTITION BY ...)のすべての魔法ははるかに低いレベルで起こり、単一の物理パーティションが複数のIDを処理します。

また、実際に作業を保存していません。 row_numbersを正しく生成するにはSparkはすべてのデータをシャッフル、並べ替え、スキャンする必要があります。完全シャッフルとソートを避けるためには、より低いレベルのメカニズムが必要です(例えば、バイナリヒープでAggregator)。

関連する問題