2016-07-06 1 views
1

私は現在、シミュレーション実行にマルチプロセッシングを使用して、同時に異なる入力値を評価しようとしています。Pythonのマルチプロセッシングは、指定されたタスクより多くの結果を返します

したがって、私は最後の数週間でたくさんのグーグルを見つけて、一緒に何かを得ました。おそらくそれほど美しいものではありませんが、それは(何らかの形で)働きます。私の問題は今では、それは私がそれをタスクに与えるよりも多くの出力を返し、私は理由を理解していません。

時には、それぞれのシミュレーション実行で期待される値が1つのみ返されることがありますが、以下の例のように、私は結果を期待します。シミュレーション実行は[23]だけである。また、シミュレーションの実行によって期待されるより多くの出力が生成されます。期間の数を例えば5に増やすと2、それは4つの出力値を生成しますが、私はそれがなぜあるのか理解できません。

誰かが私にそれをどのように変更できるかのヒントを教えてください。私はその答えを見つけることができず、かなり不満を感じています:( 私のコードを改善する方法についての提案は、私がPythonにはかなり新しいので、本当に感謝しています:

import numpy as np 
from multiprocessing import Process, Queue 
import multiprocessing 
from itertools import repeat 

class Simulation(Process): 
    Nr = 1 
    Mean = 5 
    StdDev = 3 
    Periods = 10 
    Result = [] 

    def Generate_Value(self): 
     GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0) 
     return GeneratedValue 

    def runSimulation(self): 
     for i in range(self.Periods): 
      self.Result.append(self.Generate_Value()) 
     return self.Result 

def worker(Mean, stdDev, Periods, Nr, queue): 
    Sim = Simulation() 
    Sim.Nr = Nr 
    Sim.Periods = Periods 
    Sim.Mean = Mean 
    Sim.StdDev = stdDev 
    Results = Sim.runSimulation() 
    queue.put(Results) 
    print("Simulation run " + str(Nr) + " done with a result of " + str(Results) 
      + " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")") 

if __name__ == '__main__': 
    m = multiprocessing.Manager() 
    queue = m.Queue() 
    CPUS = multiprocessing.cpu_count() # CPUS = 8 
    WORKERS = multiprocessing.Pool(processes=CPUS) 

    Mean = [50, 60, 70, 80, 90] 
    StdDev = [10, 10, 10, 10, 10] 
    Periods = 1 
    Nr = list(range(1,len(Mean) + 1)) 

    WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue))) 
    WORKERS.close() 
    WORKERS.join() 

    FinalSimulationResults = [] 
    for i in range(len(Mean)): 
     FinalSimulationResults.append(queue.get()) 
    print(FinalSimulationResults) 

例えば、その結果:

これは私が使用して単純化されたコードですこの:

Simulation run 1 done with a result of [23] (Input: mean: 50, std. dev.: 10) 
Simulation run 2 done with a result of [55] (Input: mean: 60, std. dev.: 10) 
Simulation run 3 done with a result of [64] (Input: mean: 70, std. dev.: 10) 
Simulation run 5 done with a result of [23, 89] (Input: mean: 90, std. dev.: 10) 
Simulation run 4 done with a result of [78] (Input: mean: 80, std. dev.: 10) 
[[23], [55], [64], [23, 89], [78]] 

それが機能するようになりました:)

import numpy as np 
from multiprocessing import Process, Queue 
import multiprocessing 
from itertools import repeat 

class Simulation(): 
    def __init__(self, Nr, Mean, Std_dev, Periods): 
     self.Result = [] 
     self.Nr = Nr 
     self.Mean = Mean 
     self.StdDev = Std_dev 
     self.Periods = Periods 

    def Generate_Value(self): 
     GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0) 
     return GeneratedValue 

    def runSimulation(self): 
     for i in range(self.Periods): 
      self.Result.append(self.Generate_Value()) 
     return self.Result 

def worker(Mean, stdDev, Periods, Nr, queue): 
    Sim = Simulation(Nr=Nr,Mean=Mean,Std_dev=stdDev,Periods=Periods) 
    Results = Sim.runSimulation() 
    queue.put(Results) 
    print("Simulation run " + str(Nr) + " done with a result of " + str(Results) 
      + " (Input: mean: " + str(Mean) + ", std. dev.: " + str(stdDev) + ")") 

if __name__ == '__main__': 
    start = time.time() 
    m = multiprocessing.Manager() 
    queue = m.Queue() 
    CPUS = multiprocessing.cpu_count() 
    WORKERS = multiprocessing.Pool(processes=CPUS) 

    Mean = [50, 60, 70, 80, 90] 
    StdDev = [10, 10, 10, 10, 10] 
    Periods = 100 
    Nr = list(range(1,len(Mean) + 1)) 

    WORKERS.starmap(worker, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue))) 
    WORKERS.close() 
    WORKERS.join() 

    FinalSimulationResults = [] 
    for i in range(len(Mean)): 
     FinalSimulationResults.append(queue.get()) 

    print(FinalSimulationResults) 
+0

CPUSにはどのような価値がありますか?私は 'Result'リストが何とかクラス属性であるため、属性が漏れていると思います。インスタンス固有の属性を '__init__'関数に入れるべきです。 – syntonym

答えて

1

あなたがクラスに属性を割り当てる方法は属性クラスを行いますないほど速く、私は(8つのコアでのみ2倍の速さ)かもしれませんが、同じ問題を抱えている皆のために予想されるように、ここに私の作業コードです属性。そうすれば、それらはクラスのすべてのインスタンス間で共有されます。あなたのケースでは、すべてのプロセスでクラスのインスタンスが1つしかなく、クラスオブジェクト自体がプロセス間で共有されていないため、これは非常に現実的ではありません。作業者が早期に完了して別のタスクを取得できる場合、クラスオブジェクトは再利用され、クラス属性は「期待どおり」に機能します。

class Simulation(Process): 

    def __init__(self, nr, mean, std_dev, periods): 
     self.nr = nr 
     self.mean = mean 
     self.std_dev = std_dev 
     self.periods = periods 
     self.result = [] 

    def Generate_Value(self): 
     GeneratedValue = max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0) 
     return GeneratedValue 

    def runSimulation(self): 
     for i in range(self.Periods): 
      self.Result.append(self.Generate_Value()) 
     return self.Result 

詳細については、私ドンは言っthe documentation

を参照してください:あなたは、常にインスタンスを割り当てる必要があり、これを回避するために

__init__機能に(インスタンスにインスタンスと異なっている必要がありますすなわち属性)を属性あなたがそれを使用している方法でProcessクラスを使うべきだと思います。 Poolはプロセスの作成を自動的に処理します。処理を指示するだけです。したがって、あなたのコードを書き直してください:

def task(nr, mean, std_dev, periods, results): 
    for i in range(periods): 
     results.append(max(int(round(np.random.normal(self.Mean, self.StdDev), 0)), 0)) 
    return results 


m = multiprocessing.Manager() 
queue = m.Queue() 
cpu_count = multiprocessing.cpu_count() # CPUS = 8 
pool = multiprocessing.Pool(processes=CPUS) 

Mean = [50, 60, 70, 80, 90] 
StdDev = [10, 10, 10, 10, 10] 
Periods = 1 
Nr = list(range(1,len(Mean) + 1)) 

pool.starmap(task, zip(Mean, StdDev, repeat(Periods), Nr, repeat(queue))) 
pool.close() 
pool.join() 

は動作しません(テストされません)。

+0

よかった、ありがとう。それはかなりうまくいった。しかし、私はクラスのソリューションに固執する必要があります。もちろん、単純な関数は簡単ですが、実際のシミュレーションではもっと複雑なので、クラスが必要です。 速度が8ではなく8で改善された理由は分かりますか? – FriendlyGuy

+0

私はまだクラスがプロセスからinerhitすべきではないと思います。あなたは単にプロセスの継承を削除してもまだ動作しますか?私はそれがすべきだと思います。ワークロードが低すぎる場合は、多くのプロセスを作成するオーバーヘッドがゲインよりも高くなる可能性があります。あなたはいくつかの "seriosワークロード"を持っている場合、あなたはまだ第2因子の改善を持っていますか? – syntonym

+0

あなたは何を意味するのかよく分かりません。シミュレーション(プロセス)クラスの代わりにクラスシミュレーション()を書くべきであることを意味しますか? 実際のシミュレーションでは、私は約5,000,000の期間を使用し、8コアのバージョンは1コアの約2〜3倍高速です。 – FriendlyGuy

関連する問題