2017-10-31 3 views
2

私はdask/distributedを使用して、100ノード以上の機能の評価をマルチノードクラスタに送信しています。各評価は非常に高価で、CPU時間は約90秒です。私は気付いたが、メモリリークがあり、時間の経過とともにすべての労働者がサイズが大きくなることに気づいた。評価している機能は純粋ではない。ここ は、この現象を再現するためのサンプルコードです:ダスクのメモリリークやデータ永続性が分散

import numpy as np 
from dask.distributed import Client 

class Foo: 
    def __init__(self): 
     self.a = np.random.rand(2000, 2000) # dummy data, not really used 

    @staticmethod 
    def myfun1(k): 
     return np.random.rand(10000 + k, 100) 

    def myfun2(self, k): 
     return np.random.rand(10000 + k, 100) 

client = Client('XXX-YYY:8786') 
f = Foo() 
tasks = client.map(f.myfun2, range(100), pure=False) 
results = client.gather(tasks) 
tasks = [] 

client.map場合は()の労働者のサイズは成長しない、(単なる静的メソッドである))(f.myfun1を実行するために呼び出されます。しかし、f.myfun2()を呼び出すと、上記のclient.map()の呼び出しが1回しかないと、workerのサイズがかなり大きくなります(たとえば50mb - > 400mb)。また、client.close()はワーカーのサイズを減らすために何もしません。

これはメモリリークですか、私は正しくdask.distributedを使用していませんか?後で利用できる計算結果やクラスタ上で共有されている結果については気にしません。 FWIW、分散v1.19.1とPython 3.5.4でテスト済み

答えて

0

いい例。

お客様のmyfun2メソッドは、オブジェクトに添付されており、非常に大きな属性(f.a)を持ち歩いています。このf.myfun2メソッドは、実際には移動するのに実際には高価で、1000個作成しています。大規模なオブジェクトのメソッドを分散した設定で使用することを避けるのが最善の方法です。代わりに、関数を使うことを考えてください。

+0

ありがとうマシュー。ここで問題となるのは、実際にデータを移動させるコストではありませんが、計算後には何が起きますか?つまり、すべての労働者のサイズ/フットプリントが時間の経過とともに大きく増加するということです。これを修正する唯一の方法は、client.restart()を定期的に実行することですが、それはクラスタ全体をリセットします。 私の目標が労働者の間でデータ/結果を共有する必要なしに高価な/長い計算を労働者に提出するだけの場合は、これを行うための良い方法がありますか? – marioba