2013-01-05 14 views
21

私は、別のプロセスを実行して、長い時間がかかる作業を行うためのクラスを作成しようとしています。これらのモジュールをメインモジュールから起動し、すべて終了するのを待ちます。私はプロセスを一度起動してから、プロセスを作成して破壊するのではなく、行うべきことを与え続けたいと思っています。たとえば、ddコマンドを実行しているサーバーを10台用意していて、それらをすべてscpファイルにしたい場合があります。Pythonでクラスインスタンスでマルチプロセッシングを使用するには?

最終的な目標は、システムの情報を追跡するクラスを作成することですIPアドレス、ログ、ランタイムなどのようなものに結びついていますが、そのクラスはシステムコマンドを起動し、そのシステムコマンドが実行されている間に呼び出し元に実行を戻すことができなければなりません。 。

パイプを介してクラスのインスタンスメソッドをpickle経由でサブプロセスに送信できないため、私の試行は失敗しています。それらはpickleableではありません。私はそれを様々な方法で修正しようとしましたが、私はそれを理解できません。これを行うために私のコードをどのようにしてパッチすることができますか?何か役に立つものを送ることができないなら、マルチプロセッシングとは何ですか?

クラスインスタンスで使用されているマルチプロセッシングに関する適切な文書はありますか?マルチプロセッシングモジュールを動作させる唯一の方法は、シンプルな機能です。クラスインスタンス内でそれを使用しようとすると、失敗しました。代わりにイベントを渡すべきでしょうか?私はそれをどうやって行うのか分かりません。

import multiprocessing 
import sys 
import re 

class ProcessWorker(multiprocessing.Process): 
    """ 
    This class runs as a separate process to execute worker's commands in parallel 
    Once launched, it remains running, monitoring the task queue, until "None" is sent 
    """ 

    def __init__(self, task_q, result_q): 
     multiprocessing.Process.__init__(self) 
     self.task_q = task_q 
     self.result_q = result_q 
     return 

    def run(self): 
     """ 
     Overloaded function provided by multiprocessing.Process. Called upon start() signal 
     """ 
     proc_name = self.name 
     print '%s: Launched' % (proc_name) 
     while True: 
      next_task_list = self.task_q.get() 
      if next_task is None: 
       # Poison pill means shutdown 
       print '%s: Exiting' % (proc_name) 
       self.task_q.task_done() 
       break 
      next_task = next_task_list[0] 
      print '%s: %s' % (proc_name, next_task) 
      args = next_task_list[1] 
      kwargs = next_task_list[2] 
      answer = next_task(*args, **kwargs) 
      self.task_q.task_done() 
      self.result_q.put(answer) 
     return 
# End of ProcessWorker class 

class Worker(object): 
    """ 
    Launches a child process to run commands from derived classes in separate processes, 
    which sit and listen for something to do 
    This base class is called by each derived worker 
    """ 
    def __init__(self, config, index=None): 
     self.config = config 
     self.index = index 

     # Launce the ProcessWorker for anything that has an index value 
     if self.index is not None: 
      self.task_q = multiprocessing.JoinableQueue() 
      self.result_q = multiprocessing.Queue() 

      self.process_worker = ProcessWorker(self.task_q, self.result_q) 
      self.process_worker.start() 
      print "Got here" 
      # Process should be running and listening for functions to execute 
     return 

    def enqueue_process(target): # No self, since it is a decorator 
     """ 
     Used to place an command target from this class object into the task_q 
     NOTE: Any function decorated with this must use fetch_results() to get the 
     target task's result value 
     """ 
     def wrapper(self, *args, **kwargs): 
      self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled! 
     return wrapper 

    def fetch_results(self): 
     """ 
     After all processes have been spawned by multiple modules, this command 
     is called on each one to retreive the results of the call. 
     This blocks until the execution of the item in the queue is complete 
     """ 
     self.task_q.join()       # Wait for it to to finish 
     return self.result_q.get()     # Return the result 

    @enqueue_process 
    def run_long_command(self, command): 
     print "I am running number % as process "%number, self.name 

     # In here, I will launch a subprocess to run a long-running system command 
     # p = Popen(command), etc 
     # p.wait(), etc 
     return 

    def close(self): 
     self.task_q.put(None) 
     self.task_q.join() 

if __name__ == '__main__': 
    config = ["some value", "something else"] 
    index = 7 
    workers = [] 
    for i in range(5): 
     worker = Worker(config, index) 
     worker.run_long_command("ls /") 
     workers.append(worker) 
    for worker in workers: 
     worker.fetch_results() 

    # Do more work... (this would actually be done in a distributor in another class) 

    for worker in workers: 
     worker.close() 

編集:私はWorkerクラスの外でProcessWorkerクラスとマルチプロセッシングキューの作成を移動しようとしたし、手動でワーカーインスタンスをpickle化しようとしました。それでも動作しないとエラーが発生します。

RuntimeError: Queue objects should only be shared between processes through inheritance

しかし、これらのキューの参照をワーカーインスタンスに渡すだけですか?私は何か基本的なことを忘れているここでの主なセクションから修正されたコードである。代わりにメソッド自身を送信しようとする

if __name__ == '__main__': 
    config = ["some value", "something else"] 
    index = 7 
    workers = [] 
    for i in range(1): 
     task_q = multiprocessing.JoinableQueue() 
     result_q = multiprocessing.Queue() 
     process_worker = ProcessWorker(task_q, result_q) 
     worker = Worker(config, index, process_worker, task_q, result_q) 
     something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues?? 
     process_worker.start() 
     worker.run_long_command("ls /") 
+0

['dispy'](http://dispy.sourceforge.net/)は見ましたか?頭痛や二人を救うかもしれません:) –

+2

私はdispyのクラスを使った例は見つかりませんでした。すべてが__main__から実行されているように見えますが、それは私がそれを使う方法ではありません。私の例では、マルチプロセッシングを使用しています。プロセスは__main__でうまくいきましたが、状態でクラスとメソッドを使用しようとすると失敗します –

+0

これはゲームの遅れですが、 'pathos.multiprocessing'クラスインスタンスを簡単にpickleできます。 'Queue'オブジェクトとそれ以外のものを使用する必要がある場合は、' import from Queue'をインポートして、拡張されたforkされた 'Queues'にアクセスできます。 'pathos.multiprocessing'は' dill'を使います。これは**クラス定義とインスタンスを直列化して送ります。 –

答えて

8

が(非実用的である)、実行するメソッドの名前を送信してみてください。

各作業者が同じコードを実行する場合は、単純な問題getattr(self, task_name)です。私はtask_argsが直接作業方法に供給するのdictたタプル(task_name, task_args)、渡したい

:だから

next_task_name, next_task_args = self.task_q.get() 
if next_task_name: 
    task = getattr(self, next_task_name) 
    answer = task(**next_task_args) 
    ... 
else: 
    # poison pill, shut down 
    break 
+1

これは動作しません... "AttributeError: 'ProcessWorker'オブジェクトに 'run_long_command'属性がありません"というエラーが表示されます。ProcessWorkerにはWorkerクラスに存在するメソッドが1つもないので、これはうまくいくとは思っていません。リモートプロセスがすべての状態情報を利用できるように、パイプ(状態情報付き)を介してメソッドを送信したいと思います。私が本当にマルチプロセッサモジュールのポイントを見ないのは、相手側でステートレスファンクションを実行するだけのことです。 –

+2

申し訳ありませんが、私は繰り返しなければなりません。パイプ上でメソッドを送信できません。これが 'pickle'がそれに不平を言う理由です。実行可能コードを送信することは不可能ではありませんが、コードオブジェクトをデシリアライズするだけではさらに多くの作業が必要になります。あらかじめWorkerクラスで実行したいメソッドを実装する必要があります。事前に分からないコードを送る必要がある場合は、Pythonのソースを文字列として送信し、その上で 'compile'と' eval'を呼び出してください。状態を持つメソッドを送信する場合は、すべての状態をメソッドの引数に入れるか、共有データベースを使用します。 – 9000

+0

ステートレスメソッドを実行するWRT:状態を保持できるパイプがあります。最初の状態をいくつかのプロセスに分割し、結果を収集します。高度に共有された状態(レイトレーシングのジオメトリなど)が必要な場合は、memcachedから通常のRDBMSまで(メモリ内の)データベースを使用します。 global _mutable_状態を使用するのは、通常、悪い考えです。必要な場合は、パイプから読み取り、競合(データベースなど)を解決するアービタープロセスを使用します。 – 9000

21

を、問題は、私はPythonがその魔法のいくつかの並べ替えをしていたと仮定したということでしたC++/fork()が動作する方法とは多少異なっています。私は何とかして、Pythonはクラス全体を別のプロセスにコピーするのではなく、プログラム全体をコピーしただけだと考えました。私は、ピクルのシリアル化についてのすべての話が実際にパイプ上のすべてを送信したと思うように思ったので、これを動作させるために何日も真剣に無駄にしました。私はある種のものをパイプに送ることができないことを知っていましたが、私は自分の問題は私が適切に物を梱包していないということだと思っていました。

このモジュールが使用されたときに何が起こるかをPythonのドキュメントが私に10,000フィートの視点で教えてくれれば、これはすべて回避できました。もちろん、マルチプロセスモジュールのメソッドが何をしているのか、私にはいくつかの基本的な例がありますが、私が知りたいのは、その背後にある "操作の理論"です。ここで私が使った情報の種類があります。私の答えがオフであればチャイムしてください。それは私が学ぶのを助けるでしょう。

このモジュールを使用してプロセスを開始すると、プログラム全体が別のプロセスにコピーされます。しかし、それは "__main__"プロセスではないので、私のコードがそれをチェックしていたので、それは無限に別のプロセスを起動しません。ゾンビのように何かを待っているだけで止まり、そこに座っている。 multiprocess.Process()を呼び出すときに親で初期化されたものはすべてセットアップされています。いったんマルチプロセスに何かを置くと、キューや共有メモリ、パイプなど(通信していますが)、別のプロセスがそれを受け取り、動作します。インポートされたすべてのモジュールを取得し、あたかもそれが親であるかのように設定できます。ただし、親プロセスまたは別プロセスで内部状態変数が変更されると、それらの変更が分離されます。プロセスが生成されたら、必要に応じてキュー、パイプ、共有メモリなどを使ってそれらを同期させておくことがあなたの仕事になります。

私はコードを外して始めましたが、 ProcessWorkerにある1つの余分な機能、コマンドラインを実行する "実行"メソッド。ものすごく単純。私は、このようなプロセスの起動と終了を心配する必要はありません。これは、過去にC++であらゆる種類の不安定性とパフォーマンスの問題を引き起こしました。私が最初にプロセスを起動し、そのプロセスにメッセージを渡すと、パフォーマンスが向上し、非常に安定していました。例では、私は方法は、キューを横切って輸送されていたことを考えさせられましたので、

ところで、私は私を投げた、助けを得るために、このリンクを見て:http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html 使用される最初のセクションの第二の例「NEXT_TASK()」というキューを介して受け取ったタスクを実行するように(私に)現れました。

+1

あなたの質問に対する私のコメントで述べたように、依存関係について心配することなくクラスインスタンスをピクルしたい場合は、 'dill'を使うべきです。クラスインスタンスでの定義、*または*ユーザー定義クラスを含むほとんどのオブジェクトのソースコードと依存関係を削除します。 'multiprocessing'のフォーク(問題のコメントで言及されています)は、' dill'をシリアライゼーションに使用します。あなたが記述している問題のほとんどを避けます。 –

0

REF:彼は彼は http://www.doughellmann.com/PyMOTW/multiprocessing/communication.htmlに惑わされたことを言うときデヴィッド・リンチによる6時03分で、1月6日にhttps://stackoverflow.com/a/14179779

回答は事実上正しくありません。

提供されているコードや例は、正しいものであり、広告された通りに動作します。 next_task()です。キュー経由で受信したタスクを実行します。Task.__call__()メソッドが何をしているのかを理解してください。

私のケースでは、run()の実装で構文エラーが発生しました。サブプロセスはこれを報告せず、黙って失敗します。たとえば、ある種の構文チェッカーが動作していることを確認してください。 EmacsのFlymake/Pyflakes

multiprocessing.log_to_stderr() Fを介したデバッグは、問題の絞り込みに役立ちました。

関連する問題