2017-04-08 6 views
2

私は、いくつかのDenseVectorに解析してSparkMLで最終的に使用するパイプラインを作成しています。モデルへの入力とコンピューティングリソースの両方に最適なトレーニングパラメータを見つけるために、たくさんの繰り返しを行いたいと思います。私が扱っているデータフレームは、YARNクラスター上のエグゼキュータのダイナミックな数にわたって50〜100GBの間のどこかにあると言われています。Pyspark:メモリーをオーバーフローさせずにhadoopまたはhdfsにデータフレームを保存しますか?

私がセーブすると、パーケットまたはsaveAsTableのいずれかに失敗すると、一連の失敗したタスクが発生し、最後に完全に失敗し、spark.yarn.executor.memoryOverheadが発生することが示唆されます。各idは1行で、わずか数kbです。

feature_df.write.parquet('hdfs:///user/myuser/featuredf.parquet',mode='overwrite',partitionBy='id') 

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 98 in stage 33.0 failed 4 times, most recent failure: Lost task 98.3 in 
stage 33.0 (TID 2141, rs172.hadoop.pvt, executor 441): ExecutorLostFailure 
(executor 441 exited caused by one of the running tasks) Reason: Container 
killed by YARN for exceeding memory limits. 12.0 GB of 12 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead. 

私は現在2gでこれを持っています。

スパーク作業員は現在10ギガバイト、ドライバ(クラスタにはない)は最大5ギガバイトのmaxResultSizeで16ギガバイトになります。

私が書き込む前にデータフレームをキャッシュしていますが、トラブルシューティングのために他に何ができますか?

編集:すべての変換を一度にやっているようです。

== Physical Plan == 
InMemoryTableScan [id#0L, label#90, features#119] 
    +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
     +- *Filter (isnotnull(id#0L) && (id#0L < 21326835)) 
      +- InMemoryTableScan [id#0L, label#90, features#119], [isnotnull(id#0L), (id#0L < 21326835)] 
        +- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
         +- *Project [id#0L, label#90, pythonUDF0#135 AS features#119] 
          +- BatchEvalPython [<lambda>(collect_list_is#108, 56845.0)], [id#0L, label#90, collect_list_is#108, pythonUDF0#135] 
           +- SortAggregate(key=[id#0L, label#90], functions=[collect_list(indexedSegs#39, 0, 0)], output=[id#0L, label#90, collect_list_is#108]) 
           +- *Sort [id#0L ASC NULLS FIRST, label#90 ASC NULLS FIRST], false, 0 
            +- Exchange hashpartitioning(id#0L, label#90, 200) 
             +- *Project [id#0L, UDF(segment#2) AS indexedSegs#39, cast(label#1 as double) AS label#90] 
              +- *BroadcastHashJoin [segment#2], [entry#12], LeftOuter, BuildRight 
              :- HiveTableScan [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, reka_data_long_all_files 
              +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) 
               +- *Project [cast(entry#7 as string) AS entry#12] 
                +- HiveTableScan [entry#7], MetastoreRelation reka_trop50, public_crafted_audiences_sized 
+0

実行している多くの繰り返し中にデータが歪んでいるかどうかを確認しましたか。ここでは、誰かが同じような状況に直面していた1つの例 - http://stackoverflow.com/questions/43081465/spark-container-executor-ooms-during-reducebykey – Pushkr

+0

いいえ、私ができるまでは繰り返しを実行したくないからですプロセスの開始時に準備されたデータをディスクから直接読み込むので、まだ繰り返しはありません。 –

答えて

0

最終的に私はスパークuserメーリングリストから得た手がかりは、パーティションでバランスとサイズの両方を見ていました。プランナがそれを持っていたので、1つの実行者インスタンスにあまりにも多くのものが与えられました。書き込まれるデータフレームを作成する式に.repartition(1000)を追加することで、すべての違いが生まれました。おそらく、巧妙なキー列を作成して分割することで、より多くの利益を得ることができます。

0

私は、動的割り当てを無効にすることをお勧めします。以下の設定でそれを実行してみてください:

--master yarn-client --driver-memory 15g --executor-memory 15g --executor-cores 10 --num-executors 15 -Dspark.yarn.executor.memoryOverhead=20000 -Dspark.yarn.driver.memoryOverhead=20000 -Dspark.default.parallelism=500 
+0

運がない。大規模なデータフレームの1つでsaveAsTable()を実行しようとするまで、すべてうまく行きます。 Produces: "17/04/11 03:36:14 ERROR cluster.YarnScheduler:rs209.hadoop.pvtでエグゼクティブ4が失われました:YARNによってメモリ制限を超えてコンテナが殺されました17.0 GBの物理メモリが使用されました。 yarn.executor.memoryOverhead。 " –

+0

- マスター糸クライアント - ドライバメモリ15g - 実行メモリ15g - 実行コア10 - num実行プログラム15 -Dspark.yarn.executor.memoryOverhead = 100000 - Dspark.yarn.driver.memoryOverhead = 20000 -Dspark.default.parallelism = 500 –

+0

変更なし。また、データフレームを書き込む前にキャッシュに気を付けましたが、違いはありません。 –

関連する問題