2017-09-30 6 views
0

Spark SQLのWindow関数が正しく動作しないようです。同じdata_rfe_idを持つ複数のレコードがある場合は、次に取るSpark SQLのWindows関数からの予期しない結果の取得

: 私はHDFSのブロックサイズは128メガバイトと スパークバージョン1.5 CDH 5.5

私の要件であるHadoopクラスタで火花ジョブを実行しています最大SEQ_IDとmaxiumumのservice_idあたりのような単一のレコードは、私は、生データで同じdata_rfe_idと同じSEQ_IDといくつかのレコードがそう従ってがあることがわかり

、私は== IがROW_NUMでレコードをフィルタすることができるように窓関数を用いてROW_NUMBER適用しました= 1

しかし、巨大なデータセットを持っていると動作しないようです。同じrowNumberが適用されています。

なぜこのようなことが起こっていますか?

データフレームにウィンドウ機能を適用するには、再シャッフルする必要がありますか?

は、私はこれを達成するための唯一の窓関数を使用する各data_rfe_id

に独自のランク番号を期待しています。

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.rowNumber 
..... 

scala> df.printSchema 
root 
|-- transitional_key: string (nullable = true) 
|-- seq_id: string (nullable = true) 
|-- data_rfe_id: string (nullable = true) 
|-- service_id: string (nullable = true) 
|-- event_start_date_time: string (nullable = true) 
|-- event_id: string (nullable = true) 


val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc,df("service_id").desc) 
    val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction)) 
    rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false) 

期待される結果:

+------------------------------------+-----------------+-----------+-------+ 
    |data_rfe_id       |seq_id   |service_id|row_num| 
+------------------------------------+-----------------+-----------+-------+ 
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826   |4039  |1  | 
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821   |3356  |2  | 
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802   |1857  |3  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2156  |1  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2103  |2  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2083  |3  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2082  |4  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2076  |5  | 

私は上記のコードあたりのようだ実際の結果:

+------------------------------------+-----------------+-----------+-------+ 
|data_rfe_id       |seq_id   |service_id|row_num| 
+------------------------------------+-----------------+-----------+-------+ 
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826   |4039  |1  | 
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821   |3356  |1  | 
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802   |1857  |1  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2156  |1  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2103  |1  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2083  |1  | 
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2082  |1  | 
    |23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541   |2076  |1  | 

私はこれらの予期しない結果を取得していますなぜ誰かが私に説明してもらえますか?どのように解決するのですか?

答えて

1

基本的には、seq_idとservice_idを降順で並べ替える必要があります。 rangeBetweenを必要な範囲で追加してください。ランクはあなたのために働くかもしれません。次のコードの抜粋です:あなたが火花古いバージョンのそれは仕事をしたりしませんかわからないを使用していると

val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id"),df("service_id")).desc().rangeBetween(-MAXNUMBER,MAXNUMBER)) 
val rankDF =df.withColumn("rank", rank().over(windowFunction)) 

。 windowSpecに問題がありますreference

+0

Whe n私はHiveContextを使用しましたが、私のコードは機能しましたが、いくつかのtask_idだけで別の例外がスローされました。 java.lang.ClassCastException:org.apache.spark.unsafe.types.UTF8Stringはjava.lang.Integerにキャストできません \t scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) で例外が発生しました。 \t at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow $ class.getInt(rows.scala:40) \t at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220 ) –

関連する問題