2017-07-11 9 views
0

最初の質問はスタックオーバーフローなので、ご了承ください。私はグループの格付け(長いnumpy配列)の分散を計算するために探しています。並列処理を行わずにプログラムを実行すると問題はありませんが、各プロセスが独立して実行できる場合は、32個のグループがありますので、処理速度を上げるためにマルチプロセッシングを利用したいと考えています。これは少数のグループ<には問題ありませんが、この後はあまり頻繁に実行されることはありませんが、この後プログラムは不特定多数のグループ(通常は20〜30)でエラーメッセージなしで実行を停止することがよくあります。配列は非常に大きく(21451 x 11462ユーザーの項目評価)、エラーメッセージは表示されませんが、メモリが不足しているために問題が発生しているのではないかと思います。上記のコードを実行Pythonマルチプロセッシングpool.mapがあまりにも多くのワーカープロセスで応答しない

import numpy as np 
from functools import partial 
import multiprocessing 

def variance_parallel(extra_matrices, group_num): 
    # do some variation calculation 
    # print confirmation that we have entered function, and group number 
    return single_group_var 

def variance(extra_matrices, num_groups): 
    variance_partial = partial(variance_parallel, extra_matrices) 
    for g in list(range(num_groups)): 
     group_var = pool.map(variance_partial,range(g)) 
    return(group_var)  

num_cores = multiprocessing.cpu_count() - 1 
pool = multiprocessing.Pool(processes=num_cores) 
variance(extra_matrices, num_groups) 

徐々にそれが最終的に前([0]、[0,1]、[0,1,2]、...)に分散をチェックしているグループの数を構築するプログラムを示し何も印刷しない。

私の書式設定/質問が少しずれている場合は、事前にお手伝いをしていただきありがとうございます。

+0

コードがすでにメモリ不足の場合は、最大数のコアを使用することをお勧めします。 –

答えて

0
  • 複数のプロセスはプロセスに送ら
  • データ配列が大きいので、問題は大きな配列のコピーとをどうする可能性が非常に高い

コピーする必要がありデータを共有していませんプロセスにさらに、Pythonのマルチプロセッシングでは、データをプロセスに送ることは、(a)CPUを集中的に使用し、(b)それ自身で余分なメモリを必要とするシリアル化によって行われます。

短いマルチ処理では、使用するケースに適していません。 numpyはネイティブコード拡張(GILは適用されません)であり、スレッドセーフなので、マルチプロセッシングの代わりにスレッドを使用するのが最適です。スレッドを使用すると、ワーカースレッドは親プロセスのアドレス空間を介してデータを共有できます。そのため、コピーする必要はありません。

これにより、プログラムのメモリ不足を防ぐ必要があります。

しかし、スレッドがアドレス空間を共有するには、共有するデータをPythonクラスのようにオブジェクトにバインドする必要があります。

コードサンプルが不完全なため、以下のようなものがテストされています。

import numpy as np 
from functools import partial 
from threading import Thread 
from multiprocessing import cpu_count 

class Variance(Thread): 

    def __init__(self, extra_matrices, group_num): 
     Thread.__init__(self) 
     self.extra_matrices = extra_matrices 
     self.group_num = group_num 
     self.output = None 

    def run(self): 
     # do some variation calculation 
     # print confirmation that we have entered function, and group number 
     self.output = single_group_var 

num_cores = cpu_count() - 1 
results = [] 
for g in list(range(num_groups)): 
    workers = [Variance(extra_matrices, range(g)) 
       for _ in range(num_cores)] 
    # Start threads 
    for worker in workers: 
     worker.start() 
    # Wait for completion 
    for worker in workers: 
     worker.join() 
    results.extend([w.output for w in workers]) 
print results 
関連する問題