2017-01-02 13 views
2

私はSpark 1.6.2、Scala 2.10.5、Java 1.7を使用しています。Spark dense_rankウィンドウ関数 - partitionBy節なし

私たちがpartitionBy句を使用せずに2億以上の行のデータセットに対してdense_rank()を実行する必要がある場合、orderBy句のみが使用されます。これは現在MSSQLで実行され、完了に約30分かかります。

下図のように私には、Sparkの同等のロジックを実装している:

val df1 = hqlContext.read.format("jdbc").options(
    Map("url" -> url, "driver" -> driver, 
    "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load() 

df1.cache() 

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId"))) 

以下に示すように、私は糸クラスタモードでジョブをサブミットしています。私は2ノードのHadoop 2.6クラスタを持っていて、それぞれが4つのvCoresと32GBのメモリを持っています。ログに

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar 

、私は、MSSQLから約200ミルの列のテーブルが15分でスパークにインポート&キャッシュされたを取得していることがわかります。この段階まで約5 GBのメモリが使用されており、エグゼキュータの1つで約6.2 GBのメモリがまだフリーであり、他のエグゼキュータでは11 GBのメモリが空いていることがわかります。

しかし、dense_rank()のステップは、数分後に "GC Overhead limit exceeded"エラーで常に失敗します。上記のspark-submitコマンドでわかるように、ドライバメモリを7gまで設定しています。しかし、役に立たない! もちろん、私はpartitionBy節がないことが実際にSparkで問題を引き起こしていることを理解しています。しかし残念なことに、それは我々が対処する必要があるユースケースです。

ここでさらに光を当てていただけますか?何か不足していますか? Sparkでdense_rankウィンドウ関数を使用する代わりに使用できますか?たとえば、このフォーラムの他の専門家の一人が提案した "zipWithIndex"関数を使用すると、 dense_rankとは対照的に、 "zipWithIndex"メソッドがrow_number()関数を複製すると理解しているので、dense_rankと同じ結果が得られますか?

有用なアドバイスはありがとうございます。 ありがとう!

答えて

2

二つの異なる問題がここにあります

  • あなたはパーティション化列またはパーティション述語を設けることなく、JDBC接続を介してデータをロードします。これは、単一のエグゼキュータ・スレッドを使用してすべてのデータをロード

    この問題は、通常、既存の列のいずれかを使用するか、人工キーを使用して解決するのが非常に簡単です。

  • partitionByを使用せずにウィンドウ関数を使用します。結果として、すべてのデータが単一のパーティションに再シャッフルされ、ローカルでソートされ、単一のスレッドを使用して処理されます。一般的に

    のみDataset APIを使用していますが、使用することができますいくつかのトリックがあることに対処することができます普遍的な解決策はありません。

    • が必要なレコードの順序を反映した人工的なパーティションを作成するには。 Avoid performance impact of a single partition mode in Spark window functions

      あなたの場合も同様の方法を使用できますが、以下に説明するものと同等のマルチステッププロセスが必要になります。あなたはソートRDD(同様Datasetから変換せずに同様のことを行うことが可能であるべきである)と、追加のアクションにわたって2回の別々のスキャンを使用することができ連想方法では

    • :パーティションごとに

      • 計算結果の一部(あなたのケースでは、与えられたパーティションのランク)。
      • (必要に応じてパーティションの境界と各パーティションの累積ランク値)必要な情報を収集します。
      • 2番目のスキャンを実行して、先行パーティションの部分集合を修正します。簡単にあなたのケースに合わせて調整することができ、このアプローチの

    一つの例では、あなたの提案をHow to compute cumulative sum using Spark

+0

おかげで多くのことを見つけることができます! JDBCデータソースの「partitionColumn」オプションを使用して、MSSQLからのデータインポート時間を短縮できました。しかし、私がScalaにとって新しくなったので、partitionByのない高密度ランキングの推奨事項は私が消化するのにもっと時間が必要です。しかし、私に指導してくれてありがとう! – Prash

関連する問題