2017-05-30 8 views
2

私はSpark 2.1を使用しています。Spark SQLソート順はGroupByと集約で保持されませんか?

私は、次の例を実行する場合:

val seq = Seq((123,"2016-01-01","1"),(123,"2016-01-02","2"),(123,"2016-01-03","3")) 

val df = seq.toDF("id","date","score") 

val dfAgg = df.sort("id","date").groupBy("id").agg(last("score")) 

dfAgg.show 
dfAgg.show 
dfAgg.show 
dfAgg.show 
dfAgg.show 

上記のコードの出力は次のとおりです。

+---+------------------+ 
| id|last(score, false)| 
+---+------------------+ 
|123|     1| 
+---+------------------+ 

+---+------------------+ 
| id|last(score, false)| 
+---+------------------+ 
|123|     2| 
+---+------------------+ 

+---+------------------+ 
| id|last(score, false)| 
+---+------------------+ 
|123|     1| 
+---+------------------+ 

+---+------------------+ 
| id|last(score, false)| 
+---+------------------+ 
|123|     3| 
+---+------------------+ 

+---+------------------+ 
| id|last(score, false)| 
+---+------------------+ 
|123|     3| 
+---+------------------+ 

意図は各IDの最新の日付に関連付けられたスコアを取得することでした

+---+------------------+ 
| id|last(score, false)| 
+---+------------------+ 
|123|     3| 
+---+------------------+ 

しかし、これは明らかに結果が非確定的であるために機能しませんでした。これを実現するためにウィンドウ関数を使用する必要がありますか?

+1

を試すことができますか? –

+0

スパークのバージョンはありますか? – hadooper

+0

は、スパーク2.1の問題を再生成しようとしました。一貫した結果を得ました。 show operationを実行する前に "dfAgg.cache"を試してみてください。矛盾がまだあるかどうか教えてください。 – hadooper

答えて

2

org.apache.spark.sql.catalyst.expressions.aggregate.Lastのためのドキュメントを見てみると:

/** 
* Returns the last value of `child` for a group of rows. If the last value of `child` 
* is `null`, it returns `null` (respecting nulls). Even if [[Last]] is used on an already 
* sorted column, if we do partial aggregation and final aggregation (when mergeExpression 
* is used) its result will not be deterministic (unless the input table is sorted and has 
* a single partition, and we use a single reducer to do the aggregation.). 
*/ 

が、残念ながら、これは動作が期待されていることを示しています。

私の質問に答えて、今のところは、SPARK DataFrame: select the first row of each groupが説明されているように、ウィンドウ機能のように思えます。

0

あなたはjavadocが与えられた式でソートされた新しいデータセットを返します*彼らは/ **

同じと言うにもかかわらずorderByの代わりsortで試すことができます。 *これはsort関数のエイリアスです。 * * @group typedrel * @since 2.0.0 scala.annotation.varargs DEF ORDERBY @ / (SORTCOL:文字列、sortCols:文字列):データセット[T] =ソート(SORTCOL、sortCols:_ *)

あなたは同様にあなたの期待出力を更新することができ

val dfAgg = df.orderBy("id","date").groupBy("id").agg(last("score")) 
関連する問題