私は、パーケージをループしたときにSparkがメモリの問題によりクラッシュするのを防ぐ方法を見つけようとしているファイルおよびいくつかの後処理機能を提供します。私はPySparkを使っています。これが適切なStack Overflowフォームを破るならば、謝罪してください!スクリプト内のSparkパーケットファイルを反復/ループすると、メモリエラー/ビルドが発生する(Spark SQLクエリを使用)
基本的な擬似コードは次のとおりです。
#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its "=" subdirectory
for counter in fileNums:
sparkDataFrame = sqlContext.read.parquet(counter)
summaryReportOne = sqlContext.sql.("SELECT.....")
summaryReportOne.write.partition("id").parquet("/")
summaryReportTwo = sqlContext.sql.("SELECT....")
summaryReportTwo.write.partition("id").parquet("/")
#several more queries, several involving joins, etc....
このコードでは、スパークSQLクエリを使用していますので、私はそのことができます(SQLクエリ/機能のすべてでラッパー関数を作成し、foreachの中にそれを渡すことで成功してきました標準forループとは対照的に、入力としてsparkContextまたはsqlQueryを使用しないでください)。
技術的には、これはパーティションを持つ1つの大きな寄木細工のファイルですが、すべてを一度に読み込んでそれを照会するのはずっと大です。私は各パーティション上で関数を実行する必要があります。だから、PySparkでPythonループを走らせて、各ループで1つの寄木細工パーティション(サブディレクトリ)を処理し、関連する出力レポートを作成するだけです。
大規模なmapPartition()の周りのすべてのコードを囲むことが、寄木細工のファイル全体のサイズのために機能するかどうかは不明ですか?
しかし、数回のループの後に、メモリエラー、具体的にはJavaヒープエラーのためにスクリプトがクラッシュします。私はスパークを実行することを意味していません実現
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space
;(これはランダムなファイルが第二または第三のループに読み込まれているもので発生します。私は、ループのクラッシュのためのファイルについての特別な何もないことを確認しています)これらのSQLクエリは、標準のSpark SQLパッケージ関数では複雑すぎるため、さまざまな集計統計で各ファイルの複数の集計レポートを作成します。
基本的に各ループインデックスの終わりにメモリをクリアする方法はありますか? sqlContext.dropTempTable()を使用して登録された一時テーブルを削除し、sqlContext.clearCache()を使用してキャッシュをクリアすることは役に立ちませんでした。私がsparkContextを停止して各ループで再起動しようとすると、いくつかのプロセスがまだ "ラップ"されていないのでエラーが発生します(コンテキストを "正常に"停止することができたようですが、現在のPySparkのドキュメントでこれを見つけることができませんでした。)
私は、私がそれらを終えた後にループ内のデータフレームにunpersist()を呼び出すのではなく、私もpersist()を呼び出していませんそれら;私は各ループ内のデータフレームに書き直すだけです(これは問題の一部になる可能性があります)。
エンジニアリングチームと協力してメモリ設定を調整していますが、このスクリプトの1つのループを完了するのに十分なメモリを割り当てていることがわかります(エラーなしで1つのループが実行されます)。
Sparkよりもこのユースケースで優れているツールを含め、何かアドバイスがあれば助かります。私はSparkバージョン1.6.1を使用しています。
また、Spark初心者のためのFYI:関数がUDF(通常は幾分単純であることを意味するか、またはテーブル全体ではなくただ1つの列を返すことを意味する)でなければ、従来のUDF API関数/構文を使用して関数を回避し、コードを1つの長いスクリプトとして記述します。これは醜いように見えるが、これは私のコードをほぼ50%高速化するように思えた。そして、Pythonのループを避けてください。新しいファイル/入力データフレームを処理するたびに新しいSparkコンテキストから開始するように、ループ内でBashのSparkスクリプトを呼び出してください。それは記憶の問題を避けるでしょう。 – kplaney