2012-06-15 58 views
9

Pool.apply_asyncを使用して多数のタスク(パラメータが大きい)を実行すると、プロセスが割り当てられて待機状態になり、待機プロセス数に制限はありません。 、Pythonマルチプロセッシング:待機中のプロセス数を制限する方法は?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

私は待っているプロセスの数が限られているように、待ちキューを制限する方法を探しています:これは、以下の例のように、すべてのメモリを食べることによって終わることができ待ち行列が一杯になっている間にPool.apply_asyncがブロックされます。

+0

いい例(+1)。 – mgilson

答えて

6

multiprocessing.Poolは、multiprocessing.Queueのメンバーであり、任意のmaxsizeパラメータを取ります。残念ながら、それはmaxsizeパラメータセットなしでそれを構築する。

私はmaxsizeコンストラクタ_taskqueueに渡しmultiprocessing.Pool.__init__のコピー&ペーストしてサブクラス化multiprocessing.Poolをお勧めします。

モンキー・パッチの適用対象(プールまたはキューのどちらか)も動作しますが、あなたはそれが非常に脆くなりpool._taskqueue._maxsizepool._taskqueue._semをモンキーパッチする必要があると思います:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

私はPython 2.7.3を使用しています。_taskqueueの型はQueue.Queueです。つまり、マルチプロセッシングではなく、単純なキューであることを意味します。サブ処理しているマルチプロセッシング.Poolとオーバーライド__init__はうまく動作しますが、オブジェクトのサルペッチ処理が期待どおりに機能していません。しかし、これは私が探していたハックです、ありがとう。 –

0

あなたは明示的にキューを追加することができますmaxsizeパラメータを使用し、pool.apply_async()の代わりにqueue.put()を使用します。そして、ワーカー・プロセスができます

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

'imap'を使うと違いはありません。入力キューはまだ無制限で、このソリューションを使用するとすべてのメモリが消費されます。 – Radim

+0

@Radim:あなたが無限のジェネレータを与えるとしても、答えの 'imap'コードは動作します。 – jfs

+0

Python 2では、残念ながら(py3のコードを見ていない)。いくつかの回避策については、[this so answer](http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration)を参照してください。 – Radim

1
:あなたはアクティブなワーカー・プロセスの約数にメモリ内にある作成した入力引数/結果の数を制限したい場合は

for a, b in iter(queue.get, sentinel): 
    # process it 

あなたはpool.imap*()メソッドを使用することができます

pool._taskqueueが希望のサイズを超えている場合は待ちます。

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

これを追加して、マルチプロセッシングに関する私の記憶上の問題の最も簡単な解決策であることを発見しました。私はmax_apply_size = 10を使用して、それは私の問題ではうまくいきます。これはファイルの変換が遅いことです。 @ecatmurとしてセマフォを使用することは、より堅牢なソリューションのように思えますが、単純なスクリプトでは過度のものになる可能性があります。 – Nate

関連する問題