2017-03-05 31 views
1

私はPythonでマルチプロセッシングを可能にするために次のコードを分割しようとしていますが、実際には私にとって不満足な作業になりつつあります。見つけましたが、一度にすべてのCPUコアで動作するソリューションを見つけられませんでした。Pythonでiterableをマルチプロセッシングする

iterablesを四半期に分割し、parrallelでテストを計算したいと思います。

私のシングルスレッドの例:

import itertools as it 
import numpy as np 

wmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 

plines1 = it.product(wmod[0],wmod[1],wmod[2]) 
plines2 = it.product(pmod[0],pmod[1],pmod[2]) 

check = .915 
result = [] 

for count, (A,B) in enumerate(zip(plines1,plines2)): 
    pass 

    test = (sum(B)+10)/(sum(A)+12) 
    if test > check: 
     result = np.append(result,[A,B]) 
print('results: ',result) 

私は、これは3×3行列のペアの非常に小さな例で実現するが、私は大きい行列のペアにそれを適用したいと、約かかります計算する時間。助言をいただければ幸いです。

+0

まあ、ある意味では、ループの内側から 'result = np.append(result、[A、B])'を取ります。なぜ、 'list'ではなくnumpy配列を使用していますか?このように追加すると配列とリストの関係が非常に非効率になります。奇妙なことに、 'result = []'も使用しています... –

+0

スケーラビリティと効率性について、私はnumpyを使うことに決めました。私が言ったように、3x3行列は例のためだけです。そして、forループは繰り返しですが、何とか取得しない限り、データは保持されません。 –

+0

はい、しかし 'numpy'はあなたのコードを魔法のようにスケーラブルにしません。このように 'numpy'を使うことは、逆の効果があります。 –

答えて

0

イテラブルをダンプするためにキューを使用することをお勧めします。次のようなものがあります。

import multiprocessing as mp 
import numpy as np 
import itertools as it 


def worker(in_queue, out_queue): 
    check = 0.915 
    for a in iter(in_queue.get, 'STOP'): 
     A = a[0] 
     B = a[1] 
     test = (sum(B)+10)/(sum(A)+12) 
     if test > check: 
      out_queue.put([A,B]) 
     else: 
      out_queue.put('') 

if __name__ == "__main__": 
    wmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 
    pmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 

    plines1 = it.product(wmod[0],wmod[1],wmod[2]) 
    plines2 = it.product(pmod[0],pmod[1],pmod[2]) 

    # determine length of your iterator 
    counts = 26 

    # setup iterator 
    it = zip(plines1,plines2) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue), daemon=True) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    results = [] 
    control = True 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for idx in range(counts): 
     while out_queue.empty() and control: 
      # fill the queue 
      try: 
       in_queue.put(next(it), block=True) 
      except StopIteration: 
       # signals for processes stop 
       for p in process: 
        print('stopping') 
        in_queue.put('STOP') 
       control = False 
       break 
     results.append(out_queue.get(timeout=10)) 

    # wait for processes to finish 
    for p in process: 
     p.join() 

    print(results) 

    print('finished') 

ただし、タスクリストの長さを最初に決定する必要があります。

+0

私はプロジェクトに実装する前にすべてのコードを理解しようとしていますが、サンプルを実行しようとすると、オブジェクトintが反復可能ではないというエラーが表示されます。 –

+0

彼は不平を言っている行を指摘できますか?おそらく、問題を引き起こしているのは「ワーカー」機能です。あなたの 'test'の簡単なprint-command insteatを入れてみてください – RaJa

+0

私は実際の例やテスト関数を使って答えを修正しました。私のpython 3.5でエラーなく実行されます。 – RaJa

関連する問題