2016-04-01 13 views
2

私はスパークを使用していますいくつかの計算を行います。 5分ごとに、私は新しいデータフレームを得ました。私は、辞書に入って来て、古いデータフレームは、辞書から飛び出し、このsparkでデータフレームを解放するには?

dict_1_hour[timestamp] = dataframe 

新しいデータフレームのようdict_1_hourと呼ばれる辞書にそれを置きます。最新の1時間のデータは12データフレームのみが保持されます。

私はそれらのデータフレームをどのように解放してメモリリークがないことを確認する必要がありますか?データフレームのための

一つのAPIは、それを行うことができそうです。

unpersist(blocking=True) 
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk. 

私は辞書からのデータフレームをポップだと思う他の方法(私が何のためのパラメータを知りません)。

dict_1_hour.pop(timestamp) 

Pythonは、未使用の変数を自動的に解放する必要があります。しかし、ここではそれが適切かどうかはわかりません。私はあなたが私がRDDに似たすべてのDataFrame、のしてください

+0

は、なぜあなたはメモリリークがあると思いますか? Pythonはガベージコレクションされており、一般的にあなたのためにメモリを管理するのに非常に優れています。 –

+0

実際にメモリリークが発生しています。メモリリークにより、Spark Driverプログラムが@Akshat Mahajanを停止させた –

答えて

2

ファーストを使用する必要がありますどの方法でそれ明示的

そうしてくださいアドバイスを放出しない場合に火花がデータフレームを保つことが心配、ちょうど地元の再帰的なデータ構造です。私は、PythonとJVMの両方で、他のオブジェクトと同じガベージコレクションサイクルを実行します。

考慮すべき第2の部分は、持続データ(cachepersistcacheTable、シャッフルファイルなど)です。これは一般的にSparkによって内部的に処理され、unpersistを除き、あなたはその寿命をあまり支配しません。

この2つのことを念頭に置いて、オブジェクト上の単純なdelを超えて行うことはあまりありません。 DataFrameは一時テーブルとして登録されている場合

try: 
    del dict_1_hour[timestamp] 
except KeyError: 
    pass 

それでも、最初に登録を解除することを確認してください:

from py4j.protocol import Py4JError 

try: 
    sqlContext.dropTempTable("df") 
except Py4JError: 
    pass 
+0

これら3つの方法の違いは何ですか? 1. del dict_1_hour [タイムスタンプ] 2. dict_1_hour [タイムスタンプ] .unpersist 3. dict_1_hour.pop(タイムスタンプ)両方ともメモリを解放したと言うことができますか? (popメソッドの場合は、ポップ時にデータフレームを使用している場所はどこにもないと確信しています) –

+0

実際にこれらのどれもメモリを解放しません。単純な 'pop'でカバーされていない' del'または 'del()')は、外部リソースを解放するために使用できる '__del__'メソッドを呼び出すことです(' DataFrame'は 'オブジェクトなので、面白いことは何もありません)。 – zero323

+0

それからアンパッシャーはどうですか?私は、データフレームを明示的に保持するためにキャッシュ/持続を使用しないと、unpersistを呼び出す必要はありません。 –

関連する問題