2017-09-28 12 views
0

multiprocessingパッケージをPython 3.6で使用して、約100個のタスクを実行し、同時に最大4個のタスクを同時に実行する方法を見つけようとしています。Pythonマルチプロセッシング:次の結果を取得

  1. 次の完了したタスクをプールから繰り返し取得し、すべてのタスクが成功するか失敗するまでその戻り値を処理します。
  2. 他のタスクから結果にアクセスできるように、任意のタスクで例外をスローしないでください。

プールに送信されたタスクの順序を維持する必要はありません(つまり、キューは必要ありません)。タスクの総数(上記の「100」)は法外に巨大ではありません。私はそれらを一度に提出して、労働者が利用できるようになるまで待ち行列に入れることを気にしません。

私はmultiprocessing.Poolがこれに適していると思っていましたが、私は繰り返し呼び出すことができる「次の結果を得る」メソッドを見つけることができないようです。

これはプロセス管理プリミティブから自分自身をロールバックする必要がありますか?またはPool(または私が紛失している別のもの)がこのワークフローをサポートできますか?

コンテキストについては、私は数分かかるかもしれないリモートプロセスを呼び出すために各ワーカーを使用していますが、同時にN個のジョブを処理する能力があります(具体的な例では4です)。

+0

http://pyvideo.org/search.html?q=multiprocessing – wwii

+0

@wwiiビデオは特にありますか?その質問に対処することをお勧めしますか? –

+0

一般的に - 私はPyconのビデオがかなり有益であることを知っています。また、[マルチプロセッシングモジュールのドキュメント](https://docs.python.org/3/library/multiprocessing.html)に示されている例は、私が遊んで実験したいときに私を始めさせるのに十分なようです。 – wwii

答えて

0

私は(代わりに4 & 100の2人の労働者& 6つのジョブを使用して示され、)以下のパターンを思い付いた:

import random 
import time 
from multiprocessing import Pool, TimeoutError 
from queue import Queue 


def worker(x): 
    print("Start: {}".format(x)) 
    time.sleep(5 * random.random()) # Sleep a random amount of time 
    if x == 2: 
     raise Exception("Two is bad") 
    return x 


if __name__ == '__main__': 

    with Pool(processes=2) as pool: 
     jobs = Queue() 
     for i in range(6): 
      jobs.put(pool.apply_async(worker, [i])) 

     while not jobs.empty(): 
      j = jobs.get(timeout=1) 
      try: 
       r = j.get(timeout=0.1) 
       print("Done: {}".format(r)) 
      except TimeoutError as e: 
       jobs.put(j) # Not ready, try again later 
      except Exception as e: 
       print("Exception: {}".format(e)) 

はかなりうまく動作するようです:

Start: 0 
Start: 1 
Start: 2 
Done: 1 
Start: 3 
Exception: Two is bad 
Start: 4 
Start: 5 
Done: 3 
Done: 4 
Done: 5 
Done: 0 

I'LL私がキューイングを管理するための一般的なユーティリティを作ることができるかどうかを見てください。

私が考えている主な欠点は、完成したジョブはしばらくの間気づかれず、完了していないジョブはポーリングされてタイムアウトする可能性があるということです。それを回避するには、おそらくコールバックを使う必要があります。十分に大きな問題になったら、おそらくそれを私のアプリケーションに追加します。

+0

ジョブだけでなく結果をキューに入れるべきです。 'apply_async'の' callback'パラメータに対して、結果をキューに入れる関数を設定することができます。別のスレッドは結果を順次取得します。 –

関連する問題