2012-04-17 11 views
21

プール内の特定のワーカーによって実行されているジョブが実行中のワーカーを知る方法で、Pythonマルチプロセッシングプールの各ワーカーに一意のIDを割り当てる方法はありますかそれ?ドキュメントによると、Processは、名前だけで識別目的のために使用される文字列ですnameが、Pythonマルチプロセッシングプール内でワーカーの一意のIDを取得

を持っています。それは意味を持たない の意味です。複数のプロセスに同じ名前を付けることができます。

私の場合、4つのGPUのグループに対して多数のジョブを実行し、そのジョブを実行するGPUのデバイス番号を設定する必要があります。ジョブの長さが不均一なので、前のジョブが完了する前にジョブを実行しようとしているジョブのGPUに衝突がないことを確認したいと思います(これにより、ジョブのIDをあらかじめ割り当てる事前に作業単位)。

+1

UUIDのようなランダムなものを使用しないのはなぜ? –

+0

@ LuperRouch - あなたはそれが意味するものを拡大できますか? – JoshAdel

+1

例えば '' process = Process(target = foo、name = uuid.uuid4()。hex) ''はプロセスに一意の名前を与えます。 –

答えて

38

あなたが望むように思えるのは単純です:multiprocessing.current_process()。たとえば:

import multiprocessing 

def f(x): 
    print multiprocessing.current_process() 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

出力:

$ python foo.py 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-3, started daemon)> 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-4, started daemon)> 
[0, 1, 4, 9, 16, 25] 

これは、プロセスオブジェクト自身を返すので、プロセスは、独自のアイデンティティすることができます。また、固有の数値idのためにidを呼び出すこともできます - cpythonでは、これはプロセスオブジェクトのメモリアドレスですから、私はそうではありません重複の可能性があります。最後に、プロセスのidentまたはpidプロパティを使用できますが、これはプロセスの開始後にのみ設定されます。

さらに、ソースを調べると、自動生成された名前(上記のProcessの最初の値に例示されているように)が一意である可能性が非常に高いようです。 multiprocessingは、すべてのプロセスに対してitertools.counterオブジェクトを保持します。このオブジェクトは、それが生成する子プロセスのタプルを_identityとして生成するために使用されます。したがって、トップレベルのプロセスでは、単一値のIDを持つ子プロセスが生成され、2値のIDを持つプロセスが生成されます。次に、Processコンストラクタに名前が渡されない場合は、':'.join(...)を使用して、_identityに基づいて単純にautogenerates the nameになります。次にreplaceを使用するプロセスのPoolalters the nameは、自動生成されたIDを同じままにします。

このすべての結論は2個のProcess ESがは、あなたがそれらを作成するとき彼らに同じ名前を割り当てることができますので、同じ名前を持つことが、あなたが名前には触れていない場合、彼らはユニークであるということですパラメータ。また、理論的には_identityを一意の識別子として使用できます。しかし、私は彼らがその理由を理由にプライベート変数を作りました!

アクションで上記の例:

import multiprocessing 

def f(x): 
    created = multiprocessing.Process() 
    current = multiprocessing.current_process() 
    print 'running:', current.name, current._identity 
    print 'created:', created.name, created._identity 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

出力:

$ python foo.py 
running: PoolWorker-1 (1,) 
created: Process-1:1 (1, 1) 
running: PoolWorker-2 (2,) 
created: Process-2:1 (2, 1) 
running: PoolWorker-3 (3,) 
created: Process-3:1 (3, 1) 
running: PoolWorker-1 (1,) 
created: Process-1:2 (1, 2) 
running: PoolWorker-2 (2,) 
created: Process-2:2 (2, 2) 
running: PoolWorker-4 (4,) 
created: Process-4:1 (4, 1) 
[0, 1, 4, 9, 16, 25] 
1

あなたがIDを格納し、プール・プロセスの初期化時にIDを取得するためにmultiprocessing.Queueを使用することができます。

利点:

  • あなたが内部に頼る必要はありません。
  • ユースケースがリソース/デバイスを管理する場合は、デバイス番号を直接入力することができます。これにより、デバイスが2回使用されないことが保証されます。プールより多くのプロセスがデバイスよりも多い場合、追加のプロセスはqueue.get()でブロックされ、作業は実行されません(これはあなたのポルグラムをブロックしません。私がテストしたとき)。

短所:

  • は、追加の通信オーバーヘッドとを持つプール プロセスを生成することはほんの少し時間がかかります:なしすべての作業が他人のように、最初のプロセスで実行されることがあります 例でsleep(1) はまだ初期化されていません。
  • あなたは(私はそれを回避する方法を知らない または少なくとも)グローバルが必要

例:

import multiprocessing 
from time import sleep 

def init(queue): 
    global idx 
    idx = queue.get() 

def f(x): 
    global idx 
    process = multiprocessing.current_process() 
    sleep(1) 
    return (idx, process.pid, x * x) 

ids = [0, 1, 2, 3] 
manager = multiprocessing.Manager() 
idQueue = manager.Queue() 

for i in ids: 
    idQueue.put(i) 

p = multiprocessing.Pool(8, init, (idQueue,)) 
print(p.map(f, range(8))) 

出力:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)] 

。なお、そこにプールには8つのプロセスがあり、1つのidxは1つのプロセスでのみ使用されますが、4つの異なるpidです。

0

私はこれをスレッドで行い、a queueを使用してジョブ管理を処理しました。ここにベースラインがあります。私の完全なバージョンはtry-catchesの束を持っています(特に失敗時にq.task_done()が呼び出されていることを確認するために、特にワーカーで)。

from threading import Thread 
from queue import Queue 
import time 
import random 


def run(idx, *args): 
    time.sleep(random.random() * 1) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     while True: 
      args = q.get() 
      run(idx, *args) 
      q.task_done() 

    for job in jobs: 
     q.put(job) 

    for i in range(0, workers): 
     t = Thread(target=worker, args=[i]) 
     t.daemon = True 
     t.start() 

    q.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

私は(私の労働者が、単に外部プロセスを呼び出すためのものである)マルチプロセッシングを使用する必要はありませんでしたが、これは拡張することができます。マルチプロセッシングのためのAPIがそれにタッチを変更し、ここにあなたが適応することができる方法です:

from multiprocessing import Process, Queue 
from Queue import Empty 
import time 
import random 

def run(idx, *args): 
    time.sleep(random.random() * i) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     try: 
      while True: 
       args = q.get(timeout=1) 
       run(idx, *args) 
     except Empty: 
      return 

    for job in jobs: 
     q.put(job) 

    processes = [] 
    for i in range(0, workers): 
     p = Process(target=worker, args=[i]) 
     p.daemon = True 
     p.start() 
     processes.append(p) 

    for p in processes: 
     p.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

どちらのバージョンが出力何かを好きになるでしょう:

0 : ('job', 0) 
1 : ('job', 2) 
1 : ('job', 6) 
3 : ('job', 3) 
0 : ('job', 5) 
1 : ('job', 7) 
2 : ('job', 1) 
4 : ('job', 4) 
3 : ('job', 8) 
0 : ('job', 9) 
関連する問題