2

私はpathos.multiprocessingを使用して、インスタンスメソッドを必要とするプログラムを並列化しています。ここでは最小作業例です:どのようにpathos.multiprocessingにキーワードリストを渡すのですか?

import time 
import numpy as np 
from pathos.multiprocessing import Pool, ProcessingPool, ThreadingPool 

class dummy(object): 
    def __init__(self, arg, key1=None, key2=-11): 

     np.random.seed(arg) 

     randnum = np.random.randint(0, 5) 

     print 'Sleeping {} seconds'.format(randnum) 
     time.sleep(randnum) 

     self.value = arg 
     self.more1 = key1 
     self.more2 = key2 

args = [0, 10, 20, 33, 82] 
keys = ['key1', 'key2'] 
k1val = ['car', 'borg', 'syria', 'aurora', 'libera'] 
k2val = ['a', 'b', 'c', 'd', 'e'] 
allks = [dict(zip(keys, [k1val[i], k2val[i]])) for i in range(5)] 

pool = ThreadingPool(4) 
result = pool.map(dummy, args, k1val, k2val) 

print [[r.value, r.more1, r.more2] for r in result] 

印刷結果は(予想通り)である:

Sleeping 4 seconds 
Sleeping 1 seconds 
Sleeping 3 seconds 
Sleeping 4 seconds 
Sleeping 3 seconds 
[[0, 'car', 'a'], [10, 'borg', 'b'], [20, 'syria', 'c'], [33, 'aurora', 'd'], [82, 'libera', 'e']] 

しかし、mapにこの呼び出し最後の2つの引数の問題のために、そして私がしなければ:

result2 = pool.map(dummy, args, k2val, k1val) 

私が取得:

[[0, 'a', 'car'], [10, 'b', 'borg'], [20, 'c', 'syria'], [33, 'd', 'aurora'], [82, 'e', 'libera']] 

ですが、私は最初の結果と同じものを取得したいと思います。振る舞いは標準モジュールmultiprocessingapply_asynckwdsが実行できるものと同じになります。辞書のリストを渡します。各辞書でキーはキーワード名で、項目はキーワード引数です(allks参照)。標準モジュールmultiprocessingはインスタンスメソッドを使用できないため、最小要件を満たしていないことに注意してください。

一応これは次のようになります。 結果= pool.map(ダミー、引数、kwds = allks)#これは私がpathos作者だ

答えて

3

を動作しません。ええ、あなたは私が知っていることにちょっとした仕事が必要です。現在、ProcessPoolThreadPool、およびParallelPoolからmappipe(すなわちapply)メソッドはkwdsを取ることができない - あなたはargsとしてそれらを渡す必要があります。ただし、_ProcessPoolまたは_ThreadPoolを使用する場合は、mapおよびapplyのメソッドにkwdsを渡すことができます。 pathos.poolsのプールはアンダースコアで始まり実際にはmultiprocessから直接来るので、multiprocessingのAPIと同じAPIを持っています(ただし、シリアライゼーションが良く、クラスメソッドなどを渡すことができます)。

>>> from pathos.pools import _ProcessPool 
>>> from multiprocess.pool import Pool 
>>> Pool is _ProcessPool 
True 

ので、元のコードの編集のために、この(OPの提案編集から)のようなものになります。

>>> from pathos.pools import _ThreadPool 
>>> pool = _ThreadPool(4) 
>>> 
[…] 
>>> result = [] 
>>> def callback(x): 
>>> result.append(x) 
>>> 
>>> for a, k in zip(args, allks): 
>>>  pool.apply_async(dummy, args=(a,), kwds=k, callback=callback) 
>>> 
>>> pool.close() 
>>> pool.join() 
+0

心からの特大 '開発に時間と労力をかけてくれてありがとうパスス(pathos)私の特定の問題に対処するための追加の名声。 – astabada

関連する問題