私はSpark 1.6.2、Scala 2.10.5、Java 1.7を使用しています。Spark dense_rankウィンドウ関数 - partitionBy節なし
私たちがpartitionBy句を使用せずに2億以上の行のデータセットに対してdense_rank()を実行する必要がある場合、orderBy句のみが使用されます。これは現在MSSQLで実行され、完了に約30分かかります。
下図のように私には、Sparkの同等のロジックを実装している:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
以下に示すように、私は糸クラスタモードでジョブをサブミットしています。私は2ノードのHadoop 2.6クラスタを持っていて、それぞれが4つのvCoresと32GBのメモリを持っています。ログに
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
、私は、MSSQLから約200ミルの列のテーブルが15分でスパークにインポート&キャッシュされたを取得していることがわかります。この段階まで約5 GBのメモリが使用されており、エグゼキュータの1つで約6.2 GBのメモリがまだフリーであり、他のエグゼキュータでは11 GBのメモリが空いていることがわかります。
しかし、dense_rank()のステップは、数分後に "GC Overhead limit exceeded"エラーで常に失敗します。上記のspark-submitコマンドでわかるように、ドライバメモリを7gまで設定しています。しかし、役に立たない! もちろん、私はpartitionBy節がないことが実際にSparkで問題を引き起こしていることを理解しています。しかし残念なことに、それは我々が対処する必要があるユースケースです。
ここでさらに光を当てていただけますか?何か不足していますか? Sparkでdense_rankウィンドウ関数を使用する代わりに使用できますか?たとえば、このフォーラムの他の専門家の一人が提案した "zipWithIndex"関数を使用すると、 dense_rankとは対照的に、 "zipWithIndex"メソッドがrow_number()関数を複製すると理解しているので、dense_rankと同じ結果が得られますか?
有用なアドバイスはありがとうございます。 ありがとう!
おかげで多くのことを見つけることができます! JDBCデータソースの「partitionColumn」オプションを使用して、MSSQLからのデータインポート時間を短縮できました。しかし、私がScalaにとって新しくなったので、partitionByのない高密度ランキングの推奨事項は私が消化するのにもっと時間が必要です。しかし、私に指導してくれてありがとう! – Prash