2016-11-18 10 views
0

私は関数からすべての値を収集し、後でアクセス可能なリストに追加する汎用ソリューションを試してみようとしています。これはconcurrent.futuresまたはthreadingタイプのタスクの間に使用されます。グローバルを使用せずにマルチスレッド中に関数の戻り値を収集する方法は?

from concurrent.futures import ThreadPoolExecutor 

master_list = [] 
def return_from_multithreaded(func): 
    # master_list = [] 
    def wrapper(*args, **kwargs): 
     # nonlocal master_list 
     global master_list 
     master_list += func(*args, **kwargs) 
    return wrapper 


@return_from_multithreaded 
def f(n): 
    return [n] 


with ThreadPoolExecutor(max_workers=20) as exec: 
    exec.map(f, range(1, 100)) 

print(master_list) 

を私はグローバルが含まれていない解決策を見つけるしたいと思い、おそらくクロージャとして格納されてコメントアウトmaster_listを返すことができます:ここで私はグローバルmaster_listを使用しているソリューションですか?

答えて

2

mapの結果を破棄しないでください。 mapは、各関数から返された値を返すので、無視しただけです。このコードは、その意図された目的のためにmapを使用することによって大幅に簡素化することができます

def f(n): 
    return n # No need to wrap in list 

with ThreadPoolExecutor(max_workers=20) as exec: 
    master_list = list(exec.map(f, range(1, 100))) 

print(master_list) 

あなたがこれまでに計算された結果は、(おそらくいくつかの他のスレッドがそれを見ている)を示しmaster_listが必要な場合は、あなただけのループを明示的に:

def f(n): 
    return n # No need to wrap in list 

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    for result in exec.map(f, range(1, 100)): 
     master_list.append(result) 

print(master_list) 

これはExecutorモデルのために設計されたものです。通常のスレッドは値を返すことを意図していませんが、エグゼキュータは、カバーの下の値を返すためのチャンネルを提供していますので、自分で管理する必要はありません。内部的には、これは何らかの形式のキューを使用しており、結果を順番に保持するためのメタデータを追加していますが、その複雑さに対処する必要はありません。あなたの見解では、それは通常のmap関数に相当しますが、ちょうどその作業を並列化することになります。


更新は例外を扱うカバーする:結果がヒットしたとき

mapは、労働者に発生したすべての例外を発生させます。したがって、記述されているように、タスクのいずれかが失敗した場合、最初のコードセットは何も格納しません(listは部分的に構築されますが、例外が発生すると破棄されます)。 2番目の例では、最初の例外がスローされる前に結果が保持され、残りは破棄されます(mapイテレータを格納し、それを避けるために厄介なコードを使用する必要があります)。成功したすべての結果を保存したり、何らかの方法でログに記録したりする必要がある場合は、submitlistFutureというオブジェクトを作成するのが最も簡単なものにして、シリアルまたは完了の順番で待ちます(.result())。 try/exceptを呼び出して良い結果を捨てないようにします。例えば、提出の順に結果を格納するために、あなたがしたい:より効率的なコードの場合

master_list = [] 
with ThreadPoolExecutor(max_workers=20) as exec: 
    futures = [exec.submit(f, i) for i in range(1, 100)] 
    exec.shutdown(False) # Optional: workers terminate as soon as all futures finish, 
          # rather than waiting for all results to be processed 
    for fut in futures: 
     try: 
      master_list.append(fut.result()) 
     except Exception: 
      ... log error here ... 

、あなたは彼らが終了して熱心に結果を取得するためにconcurrent.futures.as_completedを使用して、完了の順に結果ではなく、提出を取得することができます。

for fut in futures: 

は次のようになります:

as_completedは、すぐにすべての先物まで、彼らは完全に、代わりの遅延など yield ING完了/キャンセル先物の作業を行い
for fut in concurrent.futures.as_completed(futures): 

前のコードからの唯一の変更は、ということです早期に提出され、処理されます。

add_done_callbackを使用するより複雑なオプションがありますので、メインスレッドは結果を明示的に処理する必要はありませんが、通常は不要で、しばしば混乱するので、可能な限り避けるのが最善です。

+0

+1素晴らしい情報を共有します。私は疑問が1つありました。渡された関数によって例外が発生した場合、どのように動作しますか?それはそれを処理しますか? –

+0

@Moinuddin私の経験では、 'map'の代わりにThreadPoolExcecutorsを使ってエラー処理を行うには、' submit'を使います。これは未来を返し、それが完了した後に 'future.result()'を呼び出します。これにより、キャッチされた例外が発生します。 – flybonzai

+0

@flybonai:Yar。 'Future'の' list'を作成し、結果の順序が重要であれば、 'list'を繰り返し実行して、' try'/'except'をラップして順次実行します)。結果の順序が重要でない場合、 '' concurrent.futures.as_completed''(https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed)を使用すると '未来 "のオブジェクトは終了する(正常にまたは例外のために)。再び、 'try' /' except'ブロックで 'result'を呼び出してエラーを処理します。後者は、重要でない場合は一般に効率的です。 – ShadowRanger

2

私は過去にこの問題に直面しました:Running multiple asynchronous function and get the returned value of each function。これはそれを行うには、私のアプローチでした:

def async_call(func_list): 
    """ 
    Runs the list of function asynchronously. 

    :param func_list: Expects list of lists to be of format 
     [[func1, args1, kwargs1], [func2, args2, kwargs2], ...] 
    :return: List of output of the functions 
     [output1, output2, ...] 
    """ 
    def worker(function, f_args, f_kwargs, queue, index): 
     """ 
     Runs the function and appends the output to list, and the Exception in the case of error 
     """ 
     response = { 
      'index': index, # For tracking the index of each function in actual list. 
          # Since, this function is called asynchronously, order in 
          # queue may differ 
      'data': None, 
      'error': None 
     } 

     # Handle error in the function call 
     try: 
      response['data'] = function(*f_args, **f_kwargs) 
     except Exception as e: 
      response['error'] = e # send back the exception along with the queue 

     queue.put(response) 
    queue = Queue() 
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \ 
        for i, (func, args, kwargs) in enumerate(func_list)] 

    for process in processes: 
     process.start() 

    response_list = [] 
    for process in processes: 
     # Wait for process to finish 
     process.join() 

     # Get back the response from the queue 
     response = queue.get() 
     if response['error']: 
      raise response['error'] # Raise exception if the function call failed 
     response_list.append(response) 

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])] 

サンプル実行:あなたはグローバルを使用しない場合

def my_sum(x, y): 
    return x + y 

def your_mul(x, y): 
    return x*y 

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]] 

async_call(my_func_list) 
# Value returned: [3, 2] 
関連する問題