2017-12-11 13 views
0

Pool.apply_asyncを使用して呼び出された関数にQueueオブジェクトを渡すと、関数は失敗し、ApplyResult.successful()と表示されます。この場合、関数の本体はまったく動作していないように見えます。Pool.apply_asyncで呼び出されたときに、キューを渡すと失敗するのはなぜですか?

別のプロセスの結果の収集をsuggested by the multiprocessing documentationとして同期させるためにキューを使用する予定でしたが、キューが機能で実際に使用されていなくてもエラーが発生します。

from multiprocessing import Pool, Queue 
import time 
from random import randint 

def sample_function(name, results): 
    delay_ms = randint(1, 10) 
    print ("{} starting with delay {:d}".format(name, int(delay_ms))) 
    time.sleep(delay_ms) 
    # results argument is unused! 
    #results.put("{} result".format(name)) 
    print ("{} ending".format(name)) 

resultsQueue = Queue() 
jobs = ['one','two','three','four', 'five', 'six'] 

pool = Pool(processes=4) 
# fails 
jobStatuses = [pool.apply_async(sample_function, args=(job, resultsQueue)) for job in jobs] 
# succeeds 
#jobStatuses = [pool.apply_async(sample_function, args=(job,'works with string argument')) for job in jobs] 

pool.close() 
print('closing: no more tasks') 
pool.join() 

for status in jobStatuses: 
    print (status.ready(), status.successful()) 

while not resultsQueue.empty(): 
    print(resultsQueue.get()) 
print('All finished') 

私はPool.apply_asyncなく、同じ関数を呼び出すことができますし、それが成功します:sample_function('test without pool', resultsQueue)を。文字列でPool.apply_async関数を呼び出すこともできます。これは成功します。

答えて

3

apply_asyncコールでは、RuntimeErrorが発生し、multiprocessing.Queueは消音になります。

for status in job_statuses: 
    print(status.__dict__) 

出力:
あなたのコードを変更する少し私はそれをトレースすることができました

{ '_value':RuntimeErrorが( 'キューオブジェクトにのみ継承によってプロセス間で共有されなければなりません」)、 '_SUCCESS' 偽 '_callback':なし、 '_CACHE':{}、 '_job' 0 '_error_callback':なし、 '_event'}

X6時間。

これは、プロセス間で共有できるManager().Queue()を使用することで解決します。

関連する問題