2012-01-27 4 views
26

Processオブジェクトを使用してPythonでワーカープールを使用しようとしています。各作業者(プロセス)は初期化を行い(ほんのわずかな時間がかかります)、一連のジョブ(理想的にはmap()を使用)を渡して何かを返します。それ以上のコミュニケーションは必要ありません。しかし、私は労働者のcompute()機能を使用するためにmap()を使用する方法を理解できないようです。pythonプロセスを持つプール

from multiprocessing import Pool, Process 

class Worker(Process): 
    def __init__(self): 
     print 'Worker started' 
     # do some initialization here 
     super(Worker, self).__init__() 

    def compute(self, data): 
     print 'Computing things!' 
     return data * data 

if __name__ == '__main__': 
    # This works fine 
    worker = Worker() 
    print worker.compute(3) 

    # workers get initialized fine 
    pool = Pool(processes = 4, 
       initializer = Worker) 
    data = range(10) 
    # How to use my worker pool? 
    result = pool.map(compute, data) 

代わりに移動するための方法をキュー、または私はmap()を使用することができます仕事ですか?

+0

すべてのプロセスオブジェクトはステートフルです。その単語をタイトルから削除したい場合があります。また、 'compute'はワーカーのメソッドです。この例では、通常は完全にスタンドアロンの機能です。単純に初期化と処理の両方を含む計算機能を書くのはなぜですか? –

+0

十分な公正、ありがとう。初期化には長い時間がかかりますので、ワーカープロセスごとに1回しか行いません。 – Felix

+0

質問の「一連の仕事を成功させる」ということを強調したいと考える必要があります。それ以来、それは明白ではなかった。 –

答えて

50

私はこのためにキューを使用することをお勧めします。

class Worker(Process): 
    def __init__(self, queue): 
     super(Worker, self).__init__() 
     self.queue= queue 

    def run(self): 
     print 'Worker started' 
     # do some initialization here 

     print 'Computing things!' 
     for data in iter(self.queue.get, None): 
      # Use data 

今、あなたはすべてのもののようなものは、あなたが複数のワーカー間で高価なスタートアップコストを償却できるようにする必要があり、単一のキュー

request_queue = Queue() 
for i in range(4): 
    Worker(request_queue).start() 
for data in the_real_source: 
    request_queue.put(data) 
# Sentinel objects to allow clean shutdown: 1 per worker. 
for i in range(4): 
    request_queue.put(None) 

から仕事を得るため、これらの山を開始することができます。

+0

それは私が考えたことです、ありがとう!私はジョブキュー(入力)と結果キュー(出力)を使ってすべてを同期させました。 – Felix

+0

あなたは素晴らしい例ですが、strg + cが押されずに押されたときに、センチネルオブジェクトを入力する方法を今すぐ試してみましょう。 – Dukeatcoding

+0

@ S.Lott:キューがpickleに対応していませんか?そのため、あなたは[multiprocessing.Manager().Queue](http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by)を使用します。 -pool-map-async)? – zuuz

4

initializerは、イニシャライズを実行する任意の呼び出し可能ファイルを想定しています。たとえば、サブクラスProcessではなく、いくつかのグローバルを設定できます。 mapは任意のイテラブルを受け入れます。

#!/usr/bin/env python 
import multiprocessing as mp 

def init(val): 
    print('do some initialization here') 

def compute(data): 
    print('Computing things!') 
    return data * data 

def produce_data(): 
    yield -100 
    for i in range(10): 
     yield i 
    yield 100 

if __name__=="__main__": 
    p = mp.Pool(initializer=init, initargs=('arg',)) 
    print(p.map(compute, produce_data()))