Spark SQLのWindow関数が正しく動作しないようです。同じdata_rfe_idを持つ複数のレコードがある場合は、次に取るSpark SQLのWindows関数からの予期しない結果の取得
: 私はHDFSのブロックサイズは128メガバイトと スパークバージョン1.5 CDH 5.5
私の要件であるHadoopクラスタで火花ジョブを実行しています最大SEQ_IDとmaxiumumのservice_idあたりのような単一のレコードは、私は、生データで同じdata_rfe_idと同じSEQ_IDといくつかのレコードがそう従ってがあることがわかり
、私は== IがROW_NUMでレコードをフィルタすることができるように窓関数を用いてROW_NUMBER適用しました= 1
しかし、巨大なデータセットを持っていると動作しないようです。同じrowNumberが適用されています。
なぜこのようなことが起こっていますか?
データフレームにウィンドウ機能を適用するには、再シャッフルする必要がありますか?
は、私はこれを達成するための唯一の窓関数を使用する各data_rfe_id
に独自のランク番号を期待しています。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
.....
scala> df.printSchema
root
|-- transitional_key: string (nullable = true)
|-- seq_id: string (nullable = true)
|-- data_rfe_id: string (nullable = true)
|-- service_id: string (nullable = true)
|-- event_start_date_time: string (nullable = true)
|-- event_id: string (nullable = true)
val windowFunction = Window.partitionBy(df("data_rfe_id")).orderBy(df("seq_id").desc,df("service_id").desc)
val rankDF =df.withColumn("row_num",rowNumber.over(windowFunction))
rankDF.select("data_rfe_id","seq_id","service_id","row_num").show(200,false)
期待される結果:
+------------------------------------+-----------------+-----------+-------+
|data_rfe_id |seq_id |service_id|row_num|
+------------------------------------+-----------------+-----------+-------+
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826 |4039 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821 |3356 |2 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802 |1857 |3 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2156 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2103 |2 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2083 |3 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2082 |4 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2076 |5 |
私は上記のコードあたりのようだ実際の結果:
+------------------------------------+-----------------+-----------+-------+
|data_rfe_id |seq_id |service_id|row_num|
+------------------------------------+-----------------+-----------+-------+
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695826 |4039 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695821 |3356 |1 |
|9ih67fshs-de11-4f80-a66d-b52a12c14b0e|1695802 |1857 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2156 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2103 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2083 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2082 |1 |
|23sds222-9669-429e-a95b-bc984ccf0fb0 |1695541 |2076 |1 |
私はこれらの予期しない結果を取得していますなぜ誰かが私に説明してもらえますか?どのように解決するのですか?
Whe n私はHiveContextを使用しましたが、私のコードは機能しましたが、いくつかのtask_idだけで別の例外がスローされました。 java.lang.ClassCastException:org.apache.spark.unsafe.types.UTF8Stringはjava.lang.Integerにキャストできません \t scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) で例外が発生しました。 \t at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow $ class.getInt(rows.scala:40) \t at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220 ) –