2009-05-27 9 views
11

プロデューサと複数のコンシューマが1人のサーバープログラムを作成しています 私に混乱させるものは、キューに入れた最初のタスクプロデューサだけです が消費され、それらは永久にキューに のままです。pythonマルチプロセッシングのプロデューサ/コンシューマの問題

from multiprocessing import Process, Queue, cpu_count 
from http import httpserv 
import time 

def work(queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(5) 
     print "task done:", task 
    queue.put(None) 

class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     self.workers = [Process(target=work, args=(self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     httpserv(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESSES): 
      self.workers[i].join() 
     queue.close() 

Manager().start() 

プロデューサーが一度 ユーザからの要求を受信キューにタスクを置くHTTPサーバです。待ち行列に新しいタスクがあると消費者プロセスがまだブロックされていると思われますが、これは奇妙です。

P.S.上記に関係しない別の2つの質問は、私は メインの プロセス以外の独自のプロセスにHTTPサーバーを配置する方が良いかどうかわかりません。はい、すべての前にメインプロセスを実行し続けるにはどうすればいいですか? 子プロセスが終了します。 2番目の質問は、 HTTPサーバーを正常に停止する最良の方法は何ですか?

編集:私は、これは完璧に動作として、Webサーバの一部に異常がなければならないと思います

import fapws._evwsgi as evwsgi 
from fapws import base 

def httpserv(queue): 
    evwsgi.start("0.0.0.0", 8080) 
    evwsgi.set_base_module(base) 

    def request_1(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_1') 
     return ["request 1!"] 

    def request_2(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_2') 
     return ["request 2!!"] 

    evwsgi.wsgi_cb(("/request_1", request_1)) 
    evwsgi.wsgi_cb(("/request_2", request_2)) 

    evwsgi.run() 

答えて

7

:プロデューサーのコードを追加しますが、それは単純なPythonのWSGIサーバです

from multiprocessing import Process, Queue, cpu_count 
import random 
import time 


def serve(queue): 
    works = ["task_1", "task_2"] 
    while True: 
     time.sleep(0.01) 
     queue.put(random.choice(works)) 


def work(id, queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(0.05) 
     print "%d task:" % id, task 
    queue.put(None) 


class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     print "starting %d workers" % self.NUMBER_OF_PROCESSES 
     self.workers = [Process(target=work, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     serve(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESS): 
      self.workers[i].join() 
     queue.close() 


Manager().start() 

出力例:

starting 2 workers 
0 task: task_1 
1 task: task_2 
0 task: task_2 
1 task: task_1 
0 task: task_1 
+0

恐ろしいあなたがプロデューサー+マルチワーカー例を提供することができればしながら。それはいいだろう。 –

4

"2番目の質問は、HTTPサーバーを正常に停止する最良の方法は何ですか?"

これは難しいです。

  • アウト・オブ・バンドのコントロール:

    あなたはプロセス間通信のための2つの選択肢があります。サーバーには別の通信メカニズムがあります。別のソケット、Unixシグナル、またはその他のもの。それ以外のものは、サーバーのローカルディレクトリにある "stop-now"ファイルです。奇妙だと思われますが、うまくいくし、複数のソケットで受信するための選択ループや、Unis信号を受け取るシグナルハンドラを導入するより簡単です。

    "stop-now"ファイルは簡単に実装できます。 evwsgi.run()ループは、各要求後にこのファイルをチェックするだけです。サーバを停止させるには、ファイルを作成し、/controlリクエスト(500エラーなどが発生します)を実行し、サーバは停止する必要があります。 stop-nowファイルを削除してください。そうしないと、サーバーは再起動しません。

  • 帯域内制御。サーバーには別のURL(/stop)があり、これを停止します。表面的には、これはセキュリティ上の悪夢のようですが、このサーバーがどこでどのように使用されるかによってまったく異なります。これは内部要求キューの周りの単純なラッパーのように見えるので、この余分なURLはうまく機能します。

    この作業を行うには、ループから抜け出すような方法でいくつかの変数を設定することによって終了することができるevwsgi.run()の独自のバージョンを作成する必要があります。

編集

あなたはそれのワーカースレッドの状態を知らないので、あなたはおそらく、あなたのサーバーを終了する必要はありません。あなたはサーバーに信号を送る必要があります。そして、正常に終了するまで待たなければなりません。

強制的にサーバーを強制終了する場合は、os.kill()(またはmultiprocessing.terminate)が動作します。もちろん、あなたは子スレッドが何をしているのか分からない。

+0

サーバーを独自のプロセスに置き、マルチプロセスを使用してプロセスを終了するプロセスを終了しますか?これは簡単です。 – btw0

関連する問題