2017-02-05 151 views
4

concurrent.futures.ProcessPoolExecutor.map()は、2つ以上の引数からなる関数を呼びたいと思います。以下の例では、lambda関数を使用し、同じ値を持つnumberlistと等しいサイズの配列としてrefを定義しています。複数の引数を持つ関数をpython concurrent.futures.ProcessPoolExecutor.map()に渡すにはどうすればいいですか?

最初の質問:これを行うより良い方法はありますか? numberlistのサイズが数百万〜十億の要素になる可能性がある場合、refのサイズはnumberlistに従わなければならないので、このアプローチは不必要に貴重なメモリを消費します。これは、map関数を読んで、最短配列の終わりに達するまでそのマッピングを終了させるためです。上記のコードを実行する

import concurrent.futures as cf 

nmax = 10 
numberlist = range(nmax) 
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5] 
workers = 3 


def _findmatch(listnumber, ref):  
    print('def _findmatch(listnumber, ref):') 
    x='' 
    listnumber=str(listnumber) 
    ref = str(ref) 
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref)) 
    if ref in listnumber: 
     x = listnumber 
    print('x = {0}'.format(x)) 
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref) 
for n in a: 
    print(n) 
    if str(ref[0]) in n: 
     print('match') 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    #for n in executor.map(_findmatch, numberlist): 
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref): 
     print(type(n)) 
     print(n) 
     if str(ref[0]) in n: 
      print('match') 

、私はmap機能が私の望ましい結果を達成することができたことがわかりました。

Traceback (most recent call last): 
    File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed 
    obj = ForkingPickler.dumps(obj) 
    File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps 
    cls(buf, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed 

質問2:私はconcurrent.futures.ProcessPoolExecutor.map()に同じ条件を転送するときしかし、python3.5は、このエラーで失敗しましたなぜこのエラーが発生したとどのように私は、同時入手できますか.futures.ProcessPoolExecutor.map()は複数の引数を持つ関数を呼び出しますか?

答えて

3

最初の2つ目の質問に答えるために、あなたは例外を取得しています。 Pythonはpickleプロトコルを使用して、メインプロセスとProcessPoolExecutorのワーカープロセス間で渡されるデータをシリアル化するため、この問題が発生します。なぜlambdaを使用しているのかはっきりしません。あなたが持っていたラムダは、元の関数と同じように2つの引数をとります。 lambdaの代わりに_findmatchを直接使用できます。

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    for n in executor.map(_findmatch, numberlist, ref): 
     ... 

巨大なリストを作成せずに2番目の定数引数を渡すことに関する最初の問題については、いくつかの点でこれを解決できます。1つのアプローチは、反復処理時に同じ値を永久に繰り返す反復可能オブジェクトを作成するためにitertools.repeatを使用することです。

しかし、よりよいアプローチは、あなたのために定数引数を渡す特別な関数を書くことでしょう。 (おそらくこれはあなたがlambda機能を使用しようとしていた理由は?)それはあなたが使用する関数はモジュールのトップレベルの名前空間でアクセス可能である場合に動作するはずです:

def _helper(x): 
    return _findmatch(x, 5) 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    for n in executor.map(_helper, numberlist): 
     ... 
+0

あなたは ''ラムダ 'を実験することに頼っていました。なぜなら、 'ref'が定数だったとき、2つの引数を持つ関数を' executor'に渡すことに問題があったからです。 'ref'を' numberlist'と同じサイズのリストに変換した後、私はラムダを削除するのを忘れてしまいました。私が本当に欲しかったのは、 'ref'が定数であるか類似している解決策でした。したがって、あなたが言及したヘルパー関数&itertools.repeatは働きました。 –

+0

私は、[ExecutorMap]のパフォーマンスを 'Executor.submit'でベンチマークした、私の[follow-up question](http://stackoverflow.com/q/42074501/5722359)にあなたを招待して前者はかなり遅く、私は理由を知りたいのですか? –

1

最初の質問については、mapを呼び出した時点でのみ値が決定されるが、マップされた関数のすべてのインスタンスに対して一定の引数を渡すことを正しく理解していますか?

from functools import partial 
refval = 5 

def _findmatch(ref, listnumber): # arguments swapped 
    ... 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    for n in executor.map(partial(_findmatch, refval), numberlist): 
     ... 

日時:もしそうなら、私はfunctools.partialを使用して、それに焼い第二引数(あなたの例ではref)と「テンプレート機能」から派生機能でmapを行うだろう。質問2、最初の部分:私は、並列で実行されるべき関数をピケット(シリアライズ)しようとするコードの正確な部分を見つけられませんでしたが、それは自然に起こります。引数だけでなく、機能はワーカーに転送する必要があります。この転送のために関数をシリアル化する必要があります。 partialの機能は、lambdaが他の場所に記載されていない間に節約することができます。例えば、https://stackoverflow.com/a/19279016/6356764です。

Re。質問2、2番目の部分:ProcessPoolExecutor.mapに2つ以上の引数を持つ関数を呼び出す場合は、関数に最初の引数として渡し、その後に関数の最初の引数の繰り返し、2番目の引数の繰り返しあなたのケースでは、引数など:あなたが使用しているようなlambda機能がpickle化ではないため

for n in executor.map(_findmatch, numberlist, ref): 
    ... 
+0

おかげです。 :)ソリューションが機能しました。部分的に学ぶのは初めてのことです。 –

+0

私は、[ExecutorMap]のパフォーマンスを 'Executor.submit'でベンチマークした、私の[follow-up question](http://stackoverflow.com/q/42074501/5722359)にあなたを招待して前者はかなり遅く、私は理由を知りたいのですか? –

2

(1)必要はリストを作成しないように。 itertools.repeatを使用すると、ちょうどその値を繰り返すイテレータを作成できます。

(2)実行のためにサブプロセスに渡されるため、mapに名前付き関数を渡す必要があります。 mapは、pickleプロトコルを使用して物を送信します。ラムダは酸洗いすることができないため、マップの一部にすることはできません。しかし、それは全く不要です。すべてのラムダは、2つのパラメータを持つ2つのパラメータ関数を呼び出しました。完全に取り外します。

作業コードを共有するための

import concurrent.futures as cf 
import itertools 

nmax = 10 
numberlist = range(nmax) 
workers = 3 

def _findmatch(listnumber, ref):  
    print('def _findmatch(listnumber, ref):') 
    x='' 
    listnumber=str(listnumber) 
    ref = str(ref) 
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref)) 
    if ref in listnumber: 
     x = listnumber 
    print('x = {0}'.format(x)) 
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor: 
    #for n in executor.map(_findmatch, numberlist): 
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)): 
     print(type(n)) 
     print(n) 
     #if str(ref[0]) in n: 
     # print('match') 
+0

説明と解決策をありがとう。 :) –

+0

私は[ExecutorMap]のパフォーマンスを 'Executor.submit'でベンチマークした[フォローアップの質問](http://stackoverflow.com/q/42074501/5722359)にあなたを招待しています。前者がかなり遅いことを発見し、私はなぜそれを知りたいのですか? –

関連する問題