2009-04-11 7 views
2

私はPythonのマルチプロセッシングに関する質問があります。私はデータセットを取得し、チャンクに分割し、それらのチャンクを同時に実行中のプロセスに渡そうとしています。私は、単純な計算(例えば、電気抵抗 - >サーミスタの温度)を使用して、大きなデータテーブルを変換する必要があります。Pythonの動的プロセス

以下のコードは、ほとんどの場合、必要に応じて動作しますが、新しいプロセスを生成するようには見えません(または、一度に1つしかない場合)。私はPythonには新しいので、おそらくこの問題に対する簡単な解決策があります。

ありがとうございます!

from multiprocessing import Process 

class Worker(Process): 
    # example data transform 
    def process(self, x): return (x * 2)/3 

    def __init__(self, list): 
     self.data = list 
     self.result = map(self.process, self.data) 
     super(Worker, self).__init__() 

if __name__ == '__main__': 
    start = datetime.datetime.now() 
    dataset = range(10000) # null dataset 
    processes = 3 

    for i in range(processes): 
     chunk = int(math.floor(len(dataset)/float(processes))) 

     if i + 1 == processes: 
      remainder = len(dataset) % processes 
     else: remainder = 0 

     tmp = dataset[i * chunk : (i + 1) * chunk + remainder] 
     exec('worker'+str(i)+' = Worker(tmp)') 
     exec('worker'+str(i)+'.start()') 

    for i in range(processes): 
     exec('worker'+str(i)+'.join()') 
     # just a placeholder to make sure the initial values of the set are as expected 
     exec('print worker'+str(i)+'.result[0]')

答えて

1

各プロセスにチャンクの番号を送信する必要はありませんが、ちょうど(get_nowait使用)と、最終的Queue.Empty例外を処理します。すべてのプロセスが異なるCPU時間を取得するため、すべてのプロセスが忙しい状態に保たれます。

import multiprocessing, Queue 

class Worker(multiprocessing.Process): 
    def process(self, x): 
     for i in range(15): 
      x += (float(i)/2.6) 
     return x 

    def __init__(self, input, output): 
     self.input = input 
     self.output = output 
     super(Worker, self).__init__() 

    def run(self): 
     try: 
      while True: 
       self.output.put(self.process(self.input.get_nowait())) 
     except Queue.Empty: 
      pass 


if name == 'main': 
    dataset = range(10) 
    processes = multiprocessing.cpu_count() 
    input = multiprocessing.Queue() 
    output = multiprocessing.Queue() 

    for obj in dataset: 
     input.put(obj) 
    for i in range(processes): 
     Worker(input, output).start() 

    for i in range(len(dataset)): 
     print output.get() 
1

あなたはrunメソッドをオーバーライドしていません。

  1. runメソッドをオーバーライドし、ターゲットを
  2. サブクラスプロセスを指定するプロセスを作成します。それは、コードを実行させるプロセス(またはスレッド)を有する2つの方法があります。

オーバーライド__init__は、あなたのプロセスがどこにも行かないことを意味します。これは、実行する必要のある属性を実行するために使用する必要がありますが、実行するタスクを指定するべきではありません。あなたのコードで

、すべての重い物を持ち上げるには、この行で行われます。

exec('worker'+str(i)+' = Worker(tmp)') 

と何もここで行われていない:

exec('worker'+str(i)+'.start()') 

のでexec('print worker'+str(i)+'.result[0]')で結果を確認することはあなたに意味のある何かを与える必要があり、ただし、実行するコードにはが実行されていますが、プロセスの開始時ではなくプロセスの構築時に実行されているためです。

class Worker(Process): 
    # example data transform 
    def process(self, x): return (x * 2)/3 

    def __init__(self, list): 
     self.data = list 
     self.result = [] 
     super(Worker, self).__init__() 

    def run(self): 
     self.result = map(self.process, self.data) 

はEDIT:

わかりました...ので、私はちょうどここに私のスレッド本能に基づいて飛んでいた、そして、彼らはすべて間違っていた

はこれを試してみてください。私たちがどちらもプロセスについて理解していなかったのは、変数を直接共有できないということです。新しいプロセスを開始するために何を渡しても、それは永久に読み込まれ、コピーされ、消え去ります。データを共有する2つの標準的な方法のいずれかを使用しない限り:queues and pipes。私はあなたのコードを動作させるためにちょっと試したことがありますが、これまでのところ運がありません。私はあなたが正しい軌道に乗ると思う。

+0

お返事ありがとうございます!しかし、Pythonは実行時に 'IndexError:list index of range'を投げています。さらに検査すると、workerX.resultが空のリストであることがわかります。私はまだマルチプロセッシングライブラリで何か不足しているようです。 –

+0

__init__からself.result = []を削除します。 AttributeErrorを取得した場合、問題はサブプロセスを呼び出すことにあります。 IndexErrorを取得した場合、問題はデータセットにデータを取り込むことです。 print文を実行してみてください。 –

0

これで、リストがスレッドセーフではないように見えますが、キューを使用するように移動しました(ただし、はるかに遅いようです)。このコードは、基本的に私が何をしようとしていたものを実現:

import math, multiprocessing 

class Worker(multiprocessing.Process): 
    def process(self, x): 
     for i in range(15): 
      x += (float(i)/2.6) 
     return x 

    def __init__(self, input, output, chunksize): 
     self.input = input 
     self.output = output 
     self.chunksize = chunksize 
     super(Worker, self).__init__() 

    def run(self): 
     for x in range(self.chunksize): 
      self.output.put(self.process(self.input.get())) 


if __name__ == '__main__': 
    dataset = range(10) 
    processes = multiprocessing.cpu_count() 
    input = multiprocessing.Queue() 
    output = multiprocessing.Queue() 

    for obj in dataset: 
     input.put(obj) 

    for i in range(processes): 
     chunk = int(math.floor(len(dataset)/float(processes))) 
     if i + 1 == processes: 
      remainder = len(dataset) % processes 
     else: remainder = 0 

     Worker(input, output, chunk + remainder).start() 

    for i in range(len(dataset)): 
     print output.get() 
+0

はい、キュー(またはパイプ)を使用するとシリアル化コストが発生します。それは遅くなりますが、それは正しいことです。 – jnoller

関連する問題