2012-04-27 7 views
0

私はプールの割り当てのためのこの動作をPythonで確認します。私はプールに20のプロセスを持っていますが、実行するすべてのプロセスを投げるのではなく、8つのプロセスに対してmap_asyncを実行すると、わずか4回しか実行されません。それらの4つが終わると、それは2つを送り、そして2つの終わりが1つを送るのであるとき。マルチプロセッシング - プールの割り当て

私は20以上をスローすると、上記の動作が繰り返されるときにキュー内で20未満になるまで、すべて20を実行します。

これは意図的に行われていると仮定していますが、奇妙に見えます。私の目標は、要求が来てすぐに処理されるようにすることです。明らかにこの動作は適合しません。

maxtasksperchildサポート

私はそれをどのように向上させることができます任意のアイデアをためbilliardでのpython 2.6を使用していますか?

コード:

私は機能function()を使用したいデータを持っている:私はPythonでマルチプロセッシングを扱う

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10) 

while True: 
    lines = DbData.GetAll() 
    if len(lines) > 0: 
     print 'Starting to process: ', len(lines), ' urls' 
     Res = mypool.map_async(RunChild, lines) 
     Returns = Res.get(None) 
     print 'Pool returns: ', idx, Returns 
    else: 
     time.sleep(0.5) 

答えて

2

一つの方法は、次のようです。あなたが追加されているものは完全に制御することができます。このように

results = [] 
while idqueue.qsize() < nbprocess: 
    pass 
while resultqueue.qsize() > 0: 
    results.append(resultqueue.get()) 

import multiprocessing 

class ProcessThread(multiprocessing.Process): 
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue): 
     self.id_t = id_t 
     self.inputlist = inputqueue 
     self.idqueue = idqueue 
     self.function = function 
     self.resultqueue = resultqueue 

     multiprocessing.Process.__init__(self) 

    def run(self): 
     s = "process number: " + str(self.id_t) + " starting" 
     print s 
     result = [] 

     while self.inputqueue.qsize() > 0 
      try: 
       inp = self.inputqueue.get() 
      except Exception: 
       pass 
      result = self.function(inp) 
      while 1: 
       try: 
        self.resultqueue.put([self.id,]) 
       except Exception: 
        pass 
       else: 
        break 
      self.idqueue.put(id) 
      return 

と主な機能::

inputqueue = multiprocessing.Queue() 
resultqueue = multiprocessing.Queue() 
idqueue = multiprocessing.Queue() 

def function(data): 
    print data # or what you want 

for datum in data: 
    inputqueue.put(datum) 

for i in xrange(nbprocess): 
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start() 

し、最終的に得られる結果
まず私は、マルチプロセッシングのサブクラスを作成しますプロセスと他のものと。 マルチプロセスの使用inputqueueは、異なるプロセスをキューに同時にアクセスするため(例外を使用する理由)、各データの計算が非常に遅い(< 1,2秒)場合にのみ効率的な手法です。関数が非常に迅速に計算される場合は、bginingでデータを1回だけ分割し、最初のすべてのプロセスのデータセットのチャンクを配置することを検討してください。

+0

ありがとうございました。これはスクリプトを改善するのに役立ちました。私は、デフォルトのプール処理を取り除き、あなたの例に基づいて自分自身を実装しました。 – SorinV

関連する問題