私は、別のプロセスを実行して、長い時間がかかる作業を行うためのクラスを作成しようとしています。これらのモジュールをメインモジュールから起動し、すべて終了するのを待ちます。私はプロセスを一度起動してから、プロセスを作成して破壊するのではなく、行うべきことを与え続けたいと思っています。たとえば、ddコマンドを実行しているサーバーを10台用意していて、それらをすべてscpファイルにしたい場合があります。Pythonでクラスインスタンスでマルチプロセッシングを使用するには?
最終的な目標は、システムの情報を追跡するクラスを作成することですIPアドレス、ログ、ランタイムなどのようなものに結びついていますが、そのクラスはシステムコマンドを起動し、そのシステムコマンドが実行されている間に呼び出し元に実行を戻すことができなければなりません。 。
パイプを介してクラスのインスタンスメソッドをpickle経由でサブプロセスに送信できないため、私の試行は失敗しています。それらはpickleableではありません。私はそれを様々な方法で修正しようとしましたが、私はそれを理解できません。これを行うために私のコードをどのようにしてパッチすることができますか?何か役に立つものを送ることができないなら、マルチプロセッシングとは何ですか?
クラスインスタンスで使用されているマルチプロセッシングに関する適切な文書はありますか?マルチプロセッシングモジュールを動作させる唯一の方法は、シンプルな機能です。クラスインスタンス内でそれを使用しようとすると、失敗しました。代わりにイベントを渡すべきでしょうか?私はそれをどうやって行うのか分かりません。
import multiprocessing
import sys
import re
class ProcessWorker(multiprocessing.Process):
"""
This class runs as a separate process to execute worker's commands in parallel
Once launched, it remains running, monitoring the task queue, until "None" is sent
"""
def __init__(self, task_q, result_q):
multiprocessing.Process.__init__(self)
self.task_q = task_q
self.result_q = result_q
return
def run(self):
"""
Overloaded function provided by multiprocessing.Process. Called upon start() signal
"""
proc_name = self.name
print '%s: Launched' % (proc_name)
while True:
next_task_list = self.task_q.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % (proc_name)
self.task_q.task_done()
break
next_task = next_task_list[0]
print '%s: %s' % (proc_name, next_task)
args = next_task_list[1]
kwargs = next_task_list[2]
answer = next_task(*args, **kwargs)
self.task_q.task_done()
self.result_q.put(answer)
return
# End of ProcessWorker class
class Worker(object):
"""
Launches a child process to run commands from derived classes in separate processes,
which sit and listen for something to do
This base class is called by each derived worker
"""
def __init__(self, config, index=None):
self.config = config
self.index = index
# Launce the ProcessWorker for anything that has an index value
if self.index is not None:
self.task_q = multiprocessing.JoinableQueue()
self.result_q = multiprocessing.Queue()
self.process_worker = ProcessWorker(self.task_q, self.result_q)
self.process_worker.start()
print "Got here"
# Process should be running and listening for functions to execute
return
def enqueue_process(target): # No self, since it is a decorator
"""
Used to place an command target from this class object into the task_q
NOTE: Any function decorated with this must use fetch_results() to get the
target task's result value
"""
def wrapper(self, *args, **kwargs):
self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!
return wrapper
def fetch_results(self):
"""
After all processes have been spawned by multiple modules, this command
is called on each one to retreive the results of the call.
This blocks until the execution of the item in the queue is complete
"""
self.task_q.join() # Wait for it to to finish
return self.result_q.get() # Return the result
@enqueue_process
def run_long_command(self, command):
print "I am running number % as process "%number, self.name
# In here, I will launch a subprocess to run a long-running system command
# p = Popen(command), etc
# p.wait(), etc
return
def close(self):
self.task_q.put(None)
self.task_q.join()
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(5):
worker = Worker(config, index)
worker.run_long_command("ls /")
workers.append(worker)
for worker in workers:
worker.fetch_results()
# Do more work... (this would actually be done in a distributor in another class)
for worker in workers:
worker.close()
編集:私はWorker
クラスの外でProcessWorker
クラスとマルチプロセッシングキューの作成を移動しようとしたし、手動でワーカーインスタンスをpickle化しようとしました。それでも動作しないとエラーが発生します。
RuntimeError: Queue objects should only be shared between processes through inheritance
しかし、これらのキューの参照をワーカーインスタンスに渡すだけですか?私は何か基本的なことを忘れているここでの主なセクションから修正されたコードである。代わりにメソッド自身を送信しようとする
if __name__ == '__main__':
config = ["some value", "something else"]
index = 7
workers = []
for i in range(1):
task_q = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
process_worker = ProcessWorker(task_q, result_q)
worker = Worker(config, index, process_worker, task_q, result_q)
something_to_look_at = pickle.dumps(worker) # FAIL: Doesn't like queues??
process_worker.start()
worker.run_long_command("ls /")
['dispy'](http://dispy.sourceforge.net/)は見ましたか?頭痛や二人を救うかもしれません:) –
私はdispyのクラスを使った例は見つかりませんでした。すべてが__main__から実行されているように見えますが、それは私がそれを使う方法ではありません。私の例では、マルチプロセッシングを使用しています。プロセスは__main__でうまくいきましたが、状態でクラスとメソッドを使用しようとすると失敗します –
これはゲームの遅れですが、 'pathos.multiprocessing'クラスインスタンスを簡単にpickleできます。 'Queue'オブジェクトとそれ以外のものを使用する必要がある場合は、' import from Queue'をインポートして、拡張されたforkされた 'Queues'にアクセスできます。 'pathos.multiprocessing'は' dill'を使います。これは**クラス定義とインスタンスを直列化して送ります。 –