マルチプロセッシングモジュールでPool.map_async()およびPool.map()を使用する際に問題が発生しています。 Pool.map_asyncへの関数入力が「通常の」関数である限り、うまく動作する並列forループ関数を実装しました。機能が例えば、クラスのメソッドは、その後、私はPicklingErrorを得る:私は酸洗の概念にそれほど慣れていないよのでマルチプロセッシング使用時のPicklingError
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
私は科学技術計算のためのPythonを使用して、ちょうど今日それについて少し学びました。私はCan't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()のような以前の回答をいくつか見てきましたが、答えに提供されたリンクをたどっても、それを動作させる方法を理解することはできません。
私のコードは、複数のコアを使用してNormal r.vのベクトルをシミュレートすることでした。これは単なる例であり、多分複数のコア上で実行することはできません。 Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()での質問への答えで提供されるリンクに続き
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
、(ほとんど終わり)スティーブンBethardはcopy_regモジュールを使用することを提案しています。彼のコードは
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
私はこれをどのように利用できるのか分かりません。私が思いつくことができるのは、自分のコードの直前に置くことでしたが、それは助けになりませんでした。単純な解決策はもちろん、動作するものだけでcopy_regに関わることを避けることです。私は、毎回問題を回避することなく、マルチプロセッシングを十分に活用するためにcopy_regを正しく動作させることにもっと興味を持っています。
ご協力いただきありがとうございます。
マティアス
をテストしていませんあなたの答えをありがとう。私は質問があり、あなたが答えることができれば非常に感謝するでしょう: 1.あなたは言う:「(1)あなたのコードを変更して、メソッドではなく関数をワーカープロセスに渡す。 " これは私の2回目の試行で、つまりtest()関数を使って何をしているのですか?私の質問です:もし私が関数を渡していない場合、それはどのように機能するのでしょうか?私は将来のバグに出くわすことができるのですか? 私はあなたのコードを試しましたが、それも機能しましたが、私の最初の選択肢がすでに働いていれば、「複雑になる」という点は見られません。 – matiasq
私の主な問題は、私が使っているクラスが選択できないということで、あなたの選択肢(2)が私のために働かないことを指摘したいと思います。私はcopy_regを使用してこの問題を回避しようとしていました。スティーブ・ベタードが投稿した2番目のコードを使用して以来可能であったはずです。 もう一度、ありがとうございます。 – matiasq
私の最初の投稿に関して、私は間違っていました。私はあなたのコードを書いていましたが、 "if isinstance(func、types.MethodType):"という言葉が真実でないので、コードが実行されなかったので何の効果もありませんでした。これに気づかなかったことを謝ります。 – matiasq