2017-08-19 10 views
2

APIコールが機能しなかった場合、url = urlまたはurlというAPI呼び出しの結果を返す関数task(url, param1, param2)を呼び出しています。私taskのようなものになります。マルチプロセスの例外からの戻り値

def task(url, param1, param2): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     return url 

を今、私は100件のURLのリストにtaskを適用してmultiprocessingにそれらを開始したい:上記run_tasksから

import multiprocessing as mp 

def run_tasks(urls, param1, param2): 
    jobs = [] 
    for i in range(len(urls)): 
     process = mp.Process(target=task, args=(urls[i], param1, param2)) 
     jobs.append(process) 

    ## catch error processes 
    error_urls = [] 

    ## start processes 
    for j in jobs: 
     j.start() 

    ## finish processes 
    for j in jobs: 
     j.join() 

、どのように私はのリストを返します私にValueErrorを与えてくれたurl?私はerror_urls.append(j.join())を試しましたが、これはうまくいきませんでした。

答えて

2

このプロセスの結果を得るには2つの方法があります。

方法1 Managerからlistを使用してください。プロセス間で同期をとるためにロックを使用する必要はありません。

from multiprocessing import Process, Manager 

def task(url, param1, param2, error_list): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     error_list.append(url) 

def run_tasks(urls, param1, param2): 

    error_list = Manager().list()  
    jobs = [] 

    for i in range(len(urls)): 
     process = Process(target=task, args=(urls[i], param1, param2, error_list)) 
     jobs.append(process) 

    ## start processes 
    for j in jobs: 
     j.start() 

    ## finish processes 
    for j in jobs: 
     j.join() 

方法2. ProcessPoolExecutorconcurrent.futuresから。この方法は理解しやすく、コードも簡単です。

from concurrent import futures 

def task(url, param1, param2): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     return url 

def runt_tasks(urls, param1, param2): 

    with futures.ProcessPoolExecutor() as executor: 
     result = executor.map(task, urls, [param1] * len(urls), [param2] * len(urls)) 

    error_list = [item for item in result if item is not None] 

最後に、質問の説明から。これはIOに敏感な問題です。 ThreadPoolExecutorを使用することをお勧めします。 IO操作を行うと、スレッドはGILを解放して他のスレッドを実行させます。 CPUに敏感な問題の場合は、ProcessPoolExecutorを使用することをお勧めします。 asyncioは、Python 3で並行プログラミングを行うもう一つの選択肢です。

1

共有メモリを試してください。除く)(

RawArrayと同じ:これはmultiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

あなたはrun_tasks

from multiprocessing import Process, Lock 
from multiprocessing.sharedctypes import Array 
lock = Lock() 
error_urls = Array(c_char_p, [], lock = lock) 

と配列のDOC()など

def task(url, param1, param2): 
    try: 
     make_api_call(url, param1, param2) 
    except ValueError as e: 
     print("val error")    
     error_urls.append(url) 

でこれをで定義することができます使用しますそのロックの値によって異なります。 プロセスセーフ同期ラッパーm 生のctypes配列の代わりにが返されます。

したがって、プロセスセーフです。 Array()の詳細については、thisを参照してください。ctype(c_char_p)についてthis

関連する問題