2016-05-20 5 views
4

私は、パーケージをループしたときにSparkがメモリの問題によりクラッシュするのを防ぐ方法を見つけようとしているファイルおよびいくつかの後処理機能を提供します。私はPySparkを使っています。これが適切なStack Overflowフォームを破るならば、謝罪してください!スクリプト内のSparkパーケットファイルを反復/ループすると、メモリエラー/ビルドが発生する(Spark SQLクエリを使用)

基本的な擬似コードは次のとおりです。

#fileNums are the file name partitions in the parquet file 
#I read each one in as a separate file from its "=" subdirectory 
for counter in fileNums: 
    sparkDataFrame = sqlContext.read.parquet(counter) 
    summaryReportOne = sqlContext.sql.("SELECT.....") 
    summaryReportOne.write.partition("id").parquet("/") 
    summaryReportTwo = sqlContext.sql.("SELECT....") 
    summaryReportTwo.write.partition("id").parquet("/") 
    #several more queries, several involving joins, etc.... 

このコードでは、スパークSQLクエリを使用していますので、私はそのことができます(SQLクエリ/機能のすべてでラッパー関数を作成し、foreachの中にそれを渡すことで成功してきました標準forループとは対照的に、入力としてsparkContextまたはsqlQueryを使用しないでください)。

技術的には、これはパーティションを持つ1つの大きな寄木細工のファイルですが、すべてを一度に読み込んでそれを照会するのはずっと大です。私は各パーティション上で関数を実行する必要があります。だから、PySparkでPythonループを走らせて、各ループで1つの寄木細工パーティション(サブディレクトリ)を処理し、関連する出力レポートを作成するだけです。

大規模なmapPartition()の周りのすべてのコードを囲むことが、寄木細工のファイル全体のサイズのために機能するかどうかは不明ですか?

しかし、数回のループの後に、メモリエラー、具体的にはJavaヒープエラーのためにスクリプトがクラッシュします。私はスパークを実行することを意味していません実現

Caused by: com.google.protobuf.ServiceException:  
java.lang.OutOfMemoryError: Java heap space 
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244) 
at com.sun.proxy.$Proxy9.delete(Unknown Source) 
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526) 
... 42 more 
Caused by: java.lang.OutOfMemoryError: Java heap space 

;(これはランダムなファイルが第二または第三のループに読み込まれているもので発生します。私は、ループのクラッシュのためのファイルについての特別な何もないことを確認しています)これらのSQLクエリは、標準のSpark SQLパッケージ関数では複雑すぎるため、さまざまな集計統計で各ファイルの複数の集計レポートを作成します。

基本的に各ループインデックスの終わりにメモリをクリアする方法はありますか? sqlContext.dropTempTable()を使用して登録された一時テーブルを削除し、sqlContext.clearCache()を使用してキャッシュをクリアすることは役に立ちませんでした。私がsparkContextを停止して各ループで再起動しようとすると、いくつかのプロセスがまだ "ラップ"されていないのでエラーが発生します(コンテキストを "正常に"停止することができたようですが、現在のPySparkのドキュメントでこれを見つけることができませんでした。)

私は、私がそれらを終えた後にループ内のデータフレームにunpersist()を呼び出すのではなく、私もpersist()を呼び出していませんそれら;私は各ループ内のデータフレームに書き直すだけです(これは問題の一部になる可能性があります)。

エンジニアリングチームと協力してメモリ設定を調整していますが、このスクリプトの1つのループを完了するのに十分なメモリを割り当てていることがわかります(エラーなしで1つのループが実行されます)。

Sparkよりもこのユースケースで優れているツールを含め、何かアドバイスがあれば助かります。私はSparkバージョン1.6.1を使用しています。

答えて

0

更新:私が各ループでそれを終えた後に私がSQLクエリから作る各テーブルでunpersist()を呼び出すと、ループはメモリの問題なしに次の繰り返しに正常に続行できます。 .clearCache()と上記のように一時テーブルを単独で削除すると、そのトリックは実行されませんでした。私の推測では、テーブルがsparkSQLクエリからのもので、RDDを返すため、これはうまくいきました。

これらのRDDにpersist()を呼び出さなかったにもかかわらず、Sparkに次のループが開始される前にそれらをクリアして、同じSQL名を新しい変数に割り当てることができました。

+0

また、Spark初心者のためのFYI:関数がUDF(通常は幾分単純であることを意味するか、またはテーブル全体ではなくただ1つの列を返すことを意味する)でなければ、従来のUDF API関数/構文を使用して関数を回避し、コードを1つの長いスクリプトとして記述します。これは醜いように見えるが、これは私のコードをほぼ50%高速化するように思えた。そして、Pythonのループを避けてください。新しいファイル/入力データフレームを処理するたびに新しいSparkコンテキストから開始するように、ループ内でBashのSparkスクリプトを呼び出してください。それは記憶の問題を避けるでしょう。 – kplaney

1

可能な場合は、新しくリリースされたspark 2.0にアップグレードしてください。

私はあなたと同じようにJavaのヒープスペースについてよく似た問題に遭遇していました。私は、データフレームを作成し、最初にspark 1.6.2を何度も繰り返し呼び出すプロセスを繰り返すことで、4Gのヒープスペースを超えることができました。

SparkSessionを使用しているspark 2.0では、同じプログラムが1.2GBのヒープスペースしか持たなかったため、実行していたプログラムが期待するように、メモリ使用量は非常に安定していました。

関連する問題