2013-05-21 8 views
17

私はmultiprocessing.Poolmultiprocessing.Queueを使って、Pythonでプロデューサ - コンシューマパターンを実装しています。コンシューマーは、複数のタスクを起動するためにgeventを使用する事前フォークされたプロセスです。ここで 無限ループ内のキューに接続されたPythonマルチプロセッシングワーカーを停止する最もクリーンな方法は何ですか?

は、コードのバージョンダウンにトリミングされています。今、私はそれを終了しようとすると、

import gevent 
from Queue import Empty as QueueEmpty 
from multiprocessing import Process, Queue, Pool 
import signal 
import time 

# Task queue 
queue = Queue() 

def init_worker(): 
    # Ignore signals in worker 
    signal.signal(signal.SIGTERM, signal.SIG_IGN) 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 
    signal.signal(signal.SIGQUIT, signal.SIG_IGN) 

# One of the worker task 
def worker_task1(): 
    while True: 
     try: 
      m = queue.get(timeout = 2) 

      # Break out if producer says quit 
      if m == 'QUIT': 
       print 'TIME TO QUIT' 
       break 

     except QueueEmpty: 
      pass 

# Worker 
def work(): 
    gevent.joinall([ 
     gevent.spawn(worker_task1), 
    ]) 

pool = Pool(2, init_worker) 
for i in xrange(2): 
    pool.apply_async(work) 

try: 
    while True: 
     queue.put('Some Task') 
     time.sleep(2) 

except KeyboardInterrupt as e: 
    print 'STOPPING' 

    # Signal all workers to quit 
    for i in xrange(2): 
     queue.put('QUIT') 

    pool.join() 

を、私は、次の取得状態:

  1. 親プロセスが参加する子の1つを待っています。
  2. 子供の1人は無効です。だから終わったが、親は他の子供が終わるのを待っている。
  3. 他の子供が表示されています:futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...

このようなプロセスを正常に終了するにはどうすればよいですか?

答えて

12

問題を見つけました。 documentation for multiprocessing.Pool.join()によれば、join()edとなる前にpoolclose()edである必要があります。 pool.join()の前にpool.close()を追加すると問題が解決しました。

関連する問題