2010-12-01 12 views
6

誰でも、複数のpythonプロセス間でリストを共有するのを手伝ってもらえますか?問題は、self.ID_Listとself.mps_in_processを次のコードで動作させることです。非常にまもなくリストをpythonマルチプロセッシングで使用する

import time, random 
from multiprocessing import Process #, Manager, Array, Queue 

class MP_Stuff(): 
    def __init__(self, parent, id): 
     time.sleep(1 + random.random()*10) # simulate data processing 
     parent.killMP(id) 

class ParamHandler(): 
    def doFirstMP(self, IDs): 
     self.mps_in_process = [] 
     self.ID_List = IDs 
     id = self.ID_List.pop(0) 
     p = Process(target=MP_Stuff, args=(self, id)) 
     self.mps_in_process.append(id) 
     p.start() 

    def doMP(self): 
     for tmp in range(3): # nr of concurrent processes 
      if len(self.ID_List) > 0: 
       id = self.ID_List.pop(0) 
       p = Process(target=MP_Stuff, args=(self, id)) 
       self.mps_in_process.append(id) 
       p.start() 

    def killMP(self, kill_id): 
     self.mps_in_process.remove(kill_id) 
     self.doMP() 

if __name__ == '__main__': 
    ID_List = [1,2,3,4,5,6] 
    paramSet = ParamHandler() 
    paramSet.doFirstMP(ID_List)

、コードが何をするか、(ここでは、MP_Stuff内のランダムな時間)、いくつかのデータがself.ID_ListにおけるデータIDに基づいて処理されていることです。処理中のデータIDの量を知るために、self.mps_in_processが使用されます(nrプロセスはここではハードコードされていますが、実際はダイナミックです)。

問題は、複数のプロセスにわたってmps_in_processとID_Listを共有することです。現在のコードはかなり無限ループに入ります。子プロセス内のコードの実行がグローバル変数にアクセスしようとする場合

」、それは見ている値が(もしあれば)の値と同じではないかもしれない:何うまくいかないが、実際にもマルチプロセッシングライブラリに記述されていますProcess.start()が呼び出されたときの親プロセス。

しかし、私はmps_in_processとID_Listを動作させる方法を理解できません。要素をmps_in_processから取り出す方法はランダムなので、私はQueueを使うことができません。 .pop(0)は機能しないので、Arrayは使用できません。 .remove()とlen(ID_List)は動作しないので、Manager()。list()を使用することはできません。マルチプロセッシングの代わりにスレッドを使用するのは、後でfreeze_support()を使用する必要があるため、解決策はありません。

したがって、プロセス間でリストを共有する方法は大歓迎です!

答えて

3

Managerは正常に動作しています(len()を含む)。コードの問題は、メインプロセスでは処理が終了するまで待たずに、メインプロセスが終了し、マネージャにアクセスできなくなることです。また、私はListProxyのpopのアトミック性について知らないので、おそらくロックが便利だろう。

解決策は、p.join()です。

の最後にdoFirstMPの部分を入れれば十分だと私は混乱しています。最初のpが返された後ではなく、すべての計算が終了した後に最初のpが返される理由を誰かが説明することができれば幸いです。

マイコード:

import time, random 
from multiprocessing import Process, Manager 

class MP_Stuff(): 
    def __init__(self, parent, id): 
     time.sleep(1 + random.random()*5) # simulate data processing 
     print id , "done" 
     parent.killMP(id) 

class ParamHandler():  
    def doFirstMP(self, IDs): 
     self.mps_in_process = [] 
     self.ID_List = Manager().list(IDs) 
     id = self.ID_List.pop(0) 
     p = Process(target=MP_Stuff, args=(self, id)) 
     self.mps_in_process.append(id) 
     p.start() 
     p.join() 
     print "joined" 

    def doMP(self): 
     for tmp in range(3): # nr of concurrent processes 
      print self.ID_List 
      if len(self.ID_List) > 0: 
       id = self.ID_List.pop(0) 
       p = Process(target=MP_Stuff, args=(self, id)) 
       self.mps_in_process.append(id) 
       p.start() 

    def killMP(self, kill_id): 
     print "kill", kill_id 
     self.mps_in_process.remove(kill_id) 
     self.doMP() 

if __name__ == '__main__': 
    ID_List = [1,2,3,4,5,6] 
    paramSet = ParamHandler() 
    paramSet.doFirstMP(ID_List) 
+0

おかげで、それが動作します。私はdoFirstMPの最後のp.joinは十分だと思う。これは他のすべてのサブプロセスがkillMPの最後にdoMPを呼び出すからだ。 p.joinが完了する前に、__main__も終了していません。実際には、p.joinはdoMPで呼び出されるべきではありません。サブプロセスは並行しないためです。 – bitman

+0

もちろん、いくつかのプロセスプール(またはそれらのリスト)を作成して参加することもできます。私が理解できないことは、最初に生成されたプロセスの終了をブロックするものです。doMPはプロセスを作成してすぐに戻るべきであるからです。 – Krab

0

あなたのオプションは既に指定されています。

少し余分な作業が必要な場合もありますが、Array()Manager().list()の両方で対応できます。

  • あなたはValue()の量を格納し、それをデクリメント/インクリメントすることによりlen(ID_List)をエミュレートすることができます。
  • remove()は、簡単にループでエミュレートすることができますし、その後ろで削除することもできます(遅くなりますが)。
関連する問題