私は、いくつかのDenseVectorに解析してSparkMLで最終的に使用するパイプラインを作成しています。モデルへの入力とコンピューティングリソースの両方に最適なトレーニングパラメータを見つけるために、たくさんの繰り返しを行いたいと思います。私が扱っているデータフレームは、YARNクラスター上のエグゼキュータのダイナミックな数にわたって50〜100GBの間のどこかにあると言われています。Pyspark:メモリーをオーバーフローさせずにhadoopまたはhdfsにデータフレームを保存しますか?
私がセーブすると、パーケットまたはsaveAsTableのいずれかに失敗すると、一連の失敗したタスクが発生し、最後に完全に失敗し、spark.yarn.executor.memoryOverheadが発生することが示唆されます。各id
は1行で、わずか数kbです。
feature_df.write.parquet('hdfs:///user/myuser/featuredf.parquet',mode='overwrite',partitionBy='id')
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 98 in stage 33.0 failed 4 times, most recent failure: Lost task 98.3 in
stage 33.0 (TID 2141, rs172.hadoop.pvt, executor 441): ExecutorLostFailure
(executor 441 exited caused by one of the running tasks) Reason: Container
killed by YARN for exceeding memory limits. 12.0 GB of 12 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
私は現在2gでこれを持っています。
スパーク作業員は現在10ギガバイト、ドライバ(クラスタにはない)は最大5ギガバイトのmaxResultSizeで16ギガバイトになります。
私が書き込む前にデータフレームをキャッシュしていますが、トラブルシューティングのために他に何ができますか?
編集:すべての変換を一度にやっているようです。
== Physical Plan ==
InMemoryTableScan [id#0L, label#90, features#119]
+- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Filter (isnotnull(id#0L) && (id#0L < 21326835))
+- InMemoryTableScan [id#0L, label#90, features#119], [isnotnull(id#0L), (id#0L < 21326835)]
+- InMemoryRelation [id#0L, label#90, features#119], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [id#0L, label#90, pythonUDF0#135 AS features#119]
+- BatchEvalPython [<lambda>(collect_list_is#108, 56845.0)], [id#0L, label#90, collect_list_is#108, pythonUDF0#135]
+- SortAggregate(key=[id#0L, label#90], functions=[collect_list(indexedSegs#39, 0, 0)], output=[id#0L, label#90, collect_list_is#108])
+- *Sort [id#0L ASC NULLS FIRST, label#90 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0L, label#90, 200)
+- *Project [id#0L, UDF(segment#2) AS indexedSegs#39, cast(label#1 as double) AS label#90]
+- *BroadcastHashJoin [segment#2], [entry#12], LeftOuter, BuildRight
:- HiveTableScan [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, reka_data_long_all_files
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *Project [cast(entry#7 as string) AS entry#12]
+- HiveTableScan [entry#7], MetastoreRelation reka_trop50, public_crafted_audiences_sized
実行している多くの繰り返し中にデータが歪んでいるかどうかを確認しましたか。ここでは、誰かが同じような状況に直面していた1つの例 - http://stackoverflow.com/questions/43081465/spark-container-executor-ooms-during-reducebykey – Pushkr
いいえ、私ができるまでは繰り返しを実行したくないからですプロセスの開始時に準備されたデータをディスクから直接読み込むので、まだ繰り返しはありません。 –