2013-01-08 25 views
5

私は3つのプロセスを起動しており、プロセスに対応するインデックスに文字列を共有配列に入れたいと考えています。以下のコードでPython:マルチプロセッシングとc_char_pの配列

ルックは、生成される出力は次のようになります。

['test 0', None, None] 
['test 1', 'test 1', None] 
['test 2', 'test 2', 'test 2'] 

'テスト0' test 2によりtest 1、およびtest 1によって上書きされますなぜ?私が欲しいもの

は(順序は重要ではありません)です:

['test 0', None, None] 
['test 0', 'test 1', None] 
['test 0', 'test 1', 'test 2'] 

コード:

#!/usr/bin/env python 

import multiprocessing 
from multiprocessing import Value, Lock, Process, Array 
import ctypes 
from ctypes import c_int, c_char_p 

class Consumer(multiprocessing.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      multiprocessing.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr=self.arr, lock=self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr=None, lock=None): 
     with lock: 
      arr[self.i] = "test %d" % self.i 
      print arr[:] 

    def __str__(self): 
     return 'ARC' 

    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    arr = Array(ctypes.c_char_p, 3) 

    lock = multiprocessing.Lock() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

私は、Python 2.7.3(Ubuntuの)を実行しているが

答えて

5

この問題はそうですthis oneに似ています。そこでは、セバスティアンJ.F.は、arr[i]への割り当てが、割り当てを行うサブプロセスにとって意味のあるメモリアドレスに対してarr[i]を指し示すと推測しました。他のサブプロセスは、そのアドレスを見るときにガベージを検索します。

この問題を回避するには、少なくとも2つの方法があります。

import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, lock, lst): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.lock = lock 
      self.lst = lst 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(lock = self.lock, lst = self.lst) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, lock, lst): 
     with lock: 
      lst[self.i] = "test {}".format(self.i) 
      print([lst[i] for i in range(3)]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    manager = mp.Manager() 
    lst = manager.list(['']*3) 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

別の方法はmp.Array('c', 10)ような固定サイズと共用アレイを使用することである:一方はmultiprocessing.managerリストを使用することです。

私は mp.Array(ctypes.c_char_p, 3)は可変サイズを有しているメモリアドレスは、変化したことがないので、 arr[i]があるときに、メモリアドレスが変更される可能性がありますので mp.Array('c', 10)が固定サイズを持っているので mp.Array(ctypes.c_char_p, 3)がないときに、この作品理由は、あると推測
import multiprocessing as mp 

class Consumer(mp.Process): 
    def __init__(self, task_queue, result_queue, arr, lock): 
      mp.Process.__init__(self) 
      self.task_queue = task_queue 
      self.result_queue = result_queue 
      self.arr = arr 
      self.lock = lock 

    def run(self): 
      proc_name = self.name 
      while True: 
       next_task = self.task_queue.get() 
       if next_task is None: 
        self.task_queue.task_done() 
        break    
       answer = next_task(arr = self.arr, lock = self.lock) 
       self.task_queue.task_done() 
       self.result_queue.put(answer) 
      return 

class Task(object): 
    def __init__(self, i): 
     self.i = i 

    def __call__(self, arr, lock): 
     with lock: 
      arr[self.i].value = "test {}".format(self.i) 
      print([a.value for a in arr]) 

if __name__ == '__main__': 
    tasks = mp.JoinableQueue() 
    results = mp.Queue() 
    arr = [mp.Array('c', 10) for i in range(3)] 

    lock = mp.Lock() 
    num_consumers = mp.cpu_count() * 2 
    consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)] 

    for w in consumers: 
     w.start() 

    for i in xrange(3): 
     tasks.put(Task(i)) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    tasks.join() 

より大きな文字列に割り当てられます。

は、おそらくこれはthe docsは、それが述べたときを警告しているものである

共有メモリ内のポインタが、これは特定の のアドレス空間内の位置を参照すること を覚えて保存することも可能ですが

プロセス。ただし、ポインタは2番目のプロセスのコンテキストでは無効である可能性が高く、 からのポインタを逆参照しようとすると、2番目のプロセスでクラッシュする可能性があります。

+0

10億回ありがとうございます!両方のソリューションが実際に動作しています:)私はJ.F. Sebastianの投稿に出会いましたが、何らかの理由でそれを実装できませんでした。今、私はあなたの像をどこに建てるべきか教えてください。もう一度ありがとう... – Ujoux

+0

興味深い質問とあなたの熱意をありがとう!あなたがStackoverflowでもっと見ることを願っています。彫像について - 私はチェックマークの上にあるアップローをクリックするとかなり素晴らしいものになると思います^) – unutbu

+0

私は15の必要な評判を持ってすぐに行います、私は忘れません;) – Ujoux