2017-12-07 6 views
1

私はプールで非同期にnumpyのファイルをロードしようとしています:Pythonのマルチプロセッシングapply_async "0>左の主張" AssertionErrorが

self.pool = Pool(2, maxtasksperchild = 1) 
... 
nextPackage = self.pool.apply_async(loadPackages, (...)) 
for fi in np.arange(len(files)): 
    packages = nextPackage.get(timeout=30) 
    # preload the next package asynchronously. It will be available 
    # by the time it is required. 
    nextPackage = self.pool.apply_async(loadPackages, (...)) 

方法 "loadPackages":でも最初の前

def loadPackages(... (2 strings & 2 ints) ...): 
    print("This isn't printed!') 
    packages = { 
     "TRUE": np.load(gzip.GzipFile(path1, "r")), 
     "FALSE": np.load(gzip.GzipFile(path2, "r")) 
    } 
    return packages 

」パッケージ "がロードされると、次のエラーが発生します。

スレッド8の例外:トレースバック(最新の呼び出し最後):
ファイル「C:\ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ threading.py」914行、 (* self._args、** self._kwargs)ファイル "C:\ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ pool"を入力してください。 「ファイル:C:\ Users \ roman \ Anaconda3 \ envs \ tsc1 \ lib \ multiprocessing \ connection.py」、 行250、recv buf = self .recv_bytes()ファイル "C:¥Users¥roman¥Anaconda3¥envs¥tsc1¥lib¥mult​​iprocessing¥connection.py"、 行318、_recv_bytes内 return self._get_more_data(ov、maxsize)ファイル "C:\ Users \ roman \ Anaconda3 \ envs \ ts C1 \ libに\ \ connection.py」、 ライン337、> 0 AssertionErrorが

左_get_more_data アサートに、私は密接にリソースを監視マルチプロセッシング:エラーが発生したときにメモリが問題ではありませんが、私はまだたくさん残っています。 解凍されたファイルは単なる多次元numpy配列です。 個別に、より簡単な方法でプールを使用し、そのようにファイルを読み込みます。それだけが失敗する。 (これはカスタムケラスジェネレータで起こりますが、私はこれが助けになるとは思っていませんが、誰が知っているのでしょうか?)Python 3.5。

この問題の原因は何ですか?このエラーはどのように解釈できますか?

ありがとうございました!

+0

私は同じ問題を抱えています。また、十分なRAMが残っています。問題の解決策を見つけましたか? –

答えて

1

小さなチャンクでデータを取得することで回避策が見つかったと思います。私の場合、それはリストのリストでした。

私が持っていた:

に変更
for i in range(0, NUMBER_OF_THREADS): 
    print('MAIN: Getting data from process ' + str(i) + ' proxy...') 
    X_train.extend(ListasX[i]._getvalue()) 
    Y_train.extend(ListasY[i]._getvalue()) 
    ListasX[i] = None 
    ListasY[i] = None 
    gc.collect() 

CHUNK_SIZE = 1024 
for i in range(0, NUMBER_OF_THREADS): 
    print('MAIN: Getting data from process ' + str(i) + ' proxy...') 
    for k in range(0, len(ListasX[i]), CHUNK_SIZE): 
     X_train.extend(ListasX[i][k:k+CHUNK_SIZE]) 
     Y_train.extend(ListasY[i][k:k+CHUNK_SIZE]) 
    ListasX[i] = None 
    ListasY[i] = None 
    gc.collect() 

をそして今、おそらく一度に少ないデータをシリアル化することで、動作しているようです。 データを小さな部分に分けることができれば、問題を克服することができます。がんばろう!

+0

あなたの答え、Franciscoありがとう! 残念ながら、私はここで何が起こったのかについての説明がまだ不足しています。私はまた別のアーキテクチャでそれを取り組んだ。 0より大きいアサーションは、パイプの内容をバッファにコピーすることによって得られます。「関数が成功すると、戻り値はゼロ以外です」 GetLastErrorを使うと、意味のあるエラーメッセージを得ることができます...しかし、当面はこの問題を回避する方法を見つけました。 これは単なる回避策であるため、この回答を回答としてマークしないことを私に許してください。 – Doidel

+0

@Doidelもちろん、それはちょっとした手助けをしていた試みでした:] –

関連する問題