2011-08-10 2 views
26

マルチプロセッシングモジュールでPool.map_async()およびPool.map()を使用する際に問題が発生しています。 Pool.map_asyncへの関数入力が「通常の」関数である限り、うまく動作する並列forループ関数を実装しました。機能が例えば、クラスのメソッドは、その後、私はPicklingErrorを得る:私は酸洗の概念にそれほど慣れていないよのでマルチプロセッシング使用時のPicklingError

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

私は科学技術計算のためのPythonを使用して、ちょうど今日それについて少し学びました。私はCan't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()のような以前の回答をいくつか見てきましたが、答えに提供されたリンクをたどっても、それを動作させる方法を理解することはできません。

私のコードは、複数のコアを使用してNormal r.vのベクトルをシミュレートすることでした。これは単なる例であり、多分複数のコア上で実行することはできません。 Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()での質問への答えで提供されるリンクに続き

import multiprocessing as mp 
import scipy as sp 
import scipy.stats as spstat 

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None): 
    """ 
    Purpose: Evaluate function using Multiple cores. 

    Input: 
     func  - Function to evaluate in parallel 
     arg  - Array of arguments to evaluate func(arg) 
     static_arg - The "static" argument (if any), i.e. the variables that are  constant in the evaluation of func. 
     nWorkers - Number of Workers to process computations. 
    Output: 
     func(i, static_arg) for i in args. 

    """ 
    # Prepare arguments for func: Collect arguments with static argument (if any) 
    if static_arg != None: 
     arguments = [[arg] + static_arg for arg in list(args)] 
    else: 
     arguments = args 

    # Initialize workers 
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function 
    result = pool.map_async(func, arguments, chunksize = chunksize) 
    pool.close() 
    pool.join() 

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator. 
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled 
# so this will give an error. 
def genNorm(loc, scale): 
    def subfunc(a): 
     return spstat.norm.rvs(loc = loc, scale = scale, size = a) 
    return subfunc 

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled 
def test(fargs): 
    x, a, b = fargs 
    return spstat.norm.rvs(size = x, loc = a, scale = b) 

# Try it out. 
N = 1000000 

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector. 
args1 = sp.ones(N) 
static_arg = [0, 1] # standarized normal. 

# This gives the PicklingError 
func = genNorm(*static_arg) 
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None) 

# This is OK: 
func = test 
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None) 

、(ほとんど終わり)スティーブンBethardはcopy_regモジュールを使用することを提案しています。彼のコードは

def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
     else: 
      break 
    return func.__get__(obj, cls) 

import copy_reg 
import types 

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) 

私はこれをどのように利用できるのか分かりません。私が思いつくことができるのは、自分のコードの直前に置くことでしたが、それは助けになりませんでした。単純な解決策はもちろん、動作するものだけでcopy_regに関わることを避けることです。私は、毎回問題を回避することなく、マルチプロセッシングを十分に活用するためにcopy_regを正しく動作させることにもっと興味を持っています。

ご協力いただきありがとうございます。

マティアス

答えて

19

ここでの問題は、概念よりも「漬物」のエラーメッセージの小さい: マルチは その魔法を実行するために「労働者」とは、異なるプロセスでコードをforkしません。

次に、データ(ピクルを使用する部分)をシームレスにシリアライズおよびデシリアライズすることによって、異なるプロセスとの間でデータを送信します。

データの一部が関数として渡されたとき、同じ名前の関数が呼び出し先プロセスに存在するとみなされ、(おそらく)関数名が文字列として渡されます。ファンクションはステートレスなので、呼び出されたワーカープロセスは受信したデータで同じファンクションを呼び出します。 (pickleでPython関数をシリアライズすることはできませんので、参照はマスタプロセスとワーカープロセスの間で受け渡しされます)

関数がインスタンス内のメソッドである場合 - たとえPythonをコーディングしても、関数と同じもの、 "自動" self変数、それは同じものではありません。インスタンス(オブジェクト)はステートフルなのでつまり、ワーカープロセスには、相手側で呼び出すメソッドの所有者であるオブジェクトのコピーがありません。

あなたのメソッドを関数としてmap_async呼び出しに渡す方法については、マルチプロセスは関数参照を使用するだけで、実際の関数を渡すことはありません。

したがって、(1)コードを変更して、メソッドではなく関数をワーカープロセスに渡し、オブジェクトが保持する状態を呼び出して新しいパラメータに変換する必要があります。 (2)ワーカープロセス側で必要なオブジェクトを再構築し、内部の関数を呼び出すmap_async呼び出しの "target"関数を作成します。 Pythonの最も単純なクラスは、自分自身で選択可能なので、map_async呼び出しで関数所有者自身のオブジェクトを渡すことができます。また、 "target"関数はワーカー側で適切なメソッド自体を呼び出します。

(2)「困難」に聞こえるかもしれないが、それはこのようなだけで何かおそらくです - あなたのオブジェクトのクラスが漬けすることができない場合を除き:

import types 

def target(object, *args, **kw): 
    method_name = args[0] 
    return getattr(object, method_name)(*args[1:]) 
(...)  
#And add these 3 lines prior to your map_async call: 


    # Evaluate function 
    if isinstance (func, types.MethodType): 
     arguments.insert(0, func.__name__) 
     func = target 
    result = pool.map_async(func, arguments, chunksize = chunksize) 

*免責事項:私はこの

+0

をテストしていませんあなたの答えをありがとう。私は質問があり、あなたが答えることができれば非常に感謝するでしょう: 1.あなたは言う:「(1)あなたのコードを変更して、メソッドではなく関数をワーカープロセスに渡す。 " これは私の2回目の試行で、つまりtest()関数を使って何をしているのですか?私の質問です:もし私が関数を渡していない場合、それはどのように機能するのでしょうか?私は将来のバグに出くわすことができるのですか? 私はあなたのコードを試しましたが、それも機能しましたが、私の最初の選択肢がすでに働いていれば、「複雑になる」という点は見られません。 – matiasq

+0

私の主な問題は、私が使っているクラスが選択できないということで、あなたの選択肢(2)が私のために働かないことを指摘したいと思います。私はcopy_regを使用してこの問題を回避しようとしていました。スティーブ・ベタードが投稿した2番目のコードを使用して以来可能であったはずです。 もう一度、ありがとうございます。 – matiasq

+0

私の最初の投稿に関して、私は間違っていました。私はあなたのコードを書いていましたが、 "if isinstance(func、types.MethodType):"という言葉が真実でないので、コードが実行されなかったので何の効果もありませんでした。これに気づかなかったことを謝ります。 – matiasq

関連する問題