2016-05-14 5 views
4

Pythonのmultiprocessing.Poolを使用すると、mapという奇妙な動作があります。下の例では、4つのプロセッサのプールが28のタスクで動作します。これは、それぞれ4秒かかる7回のパスを取る必要があります。最後のプロセスのpythonマルチプロセッシングマップの誤った取り扱い

ただし、8回かかります。最初の6回のパスでは、すべてのプロセッサが使用されます。 7回目のパスでは、2つのタスク(2つのアイドリング・プロセッサー)のみが完了します。残りの2つのタスクは8番目のパスで終了します(2つのアイドリングプロセッサー、もう一度)。この動作は、cpusの数とタスクの数が一見無作為に組み合わされ、不必要に時間が失われた場合に表示されます。

この例は、Intel Xeon Haswell(20コア)とIntel i7(4コア)の両方で再現されています。

Poolをすべてのパスで使用可能なすべてのプロセッサを使用するように強制する方法はありますか?

import time 
import multiprocessing 
from multiprocessing import Pool 
import datetime 

def f(values): 
    now = str(datetime.datetime.now()) 
    proc_id = str(multiprocessing.current_process()) 
    print(proc_id+' '+now) 
    a=values**2 
    time.sleep(4) 
    return a 

if __name__ == '__main__': 
    p = Pool(4) #number of processes 
    processed_values= p.map(f, range(28)) 
    p.close() 
    p.join() 
    print processed_values 

実行の出力は、これは明確か正しい答えを持っていない、次の質問に関連している

<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:49.604065 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:49.604189 
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:49.604252 
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:49.604866 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:53.608475 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:53.608878 
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:53.608931 
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:53.609503 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:57.612831 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:57.613135 
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:57.613555 
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:57.614065 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:01.616974 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:01.617273 
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:01.617699 
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:01.618190 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:05.621284 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:05.621489 
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:05.622130 
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:05.622404 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:09.625522 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:09.625631 
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:09.626555 
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:09.626566 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:13.629761 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:13.629846 
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:17.634003 
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:17.634317 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729] 

以下の通りです。 Python: Multiprocessing Map takes longer to complete last few processes

+0

特有!使用しているPythonのバージョンとOSを特定する必要があります。興味深い:あなたが私のボックスに書いているものを見ていますが、 'range(32)'や 'range(24)'に変更しても、それはちょっと困っています;-) –

+0

@dano:素晴らしい回答! – Luis

答えて

3

これは、あなたがそれを渡すとPoolで各ワーカーに送信しPool.mapチャンクアップ反復可能な方法によって引き起こされます。

import time 
import multiprocessing 
from multiprocessing import Pool 
import datetime 

def f(values): 
    now = str(datetime.datetime.now()) 
    proc_id = str(multiprocessing.current_process()) 
    print(proc_id+' '+now) 
    a=values**2 
    time.sleep(4) 
    return a 

if __name__ == '__main__': 
    p = Pool(4) #number of processes 
    processed_values= p.map(f, range(28), chunksize=1) 
    p.close() 
    p.join() 
    print processed_values 

が出力:

<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:06.548733 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:06.548803 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:06.549013 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:06.549052 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:10.549509 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:10.551091 
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:10.553057 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:10.553263 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:14.553765 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:14.553821 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:14.554953 
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:14.557262 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:18.556535 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:18.556611 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:18.558019 
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:18.561597 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:22.560039 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:22.560097 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:22.562236 
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:22.565912 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:26.564383 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:26.564430 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:26.564589 
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:26.570232 
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:30.568634 
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:30.568647 
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:30.568752 
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:30.574456 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729] 

mapは、あなたが1つを提供していない時にチャンクサイズを選ぶために使用するアルゴリズムあなたが1であることをchunksizeを強制する場合は、あなたが期待する動作が表示されます次のようになります。

サイズ28の反復可能なために
if chunksize is None: 
     chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 
     if extra: 
      chunksize += 1 
    if len(iterable) == 0: 
     chunksize = 0 

、それが出てくる2にこれは、各ワーカー・プロセスは、一度に一つのあなたのiterableから2つの項目がないつかむことを意味します。したがって、キューに4つのアイテムしか残っていない場合、最初のフリーワーカーは2つを取得し、2番目のフリーワーカーは2つを取得します。

最初のチャンク化の理由は、IPCのオーバーヘッドを減らすことによって、非常に大きなイテラブルを処理するときのパフォーマンスが大幅に向上するからです。より小さなiterableの場合、この場合と同じように、それはあまり差をつけないか、パフォーマンスを傷つける傾向があります。

+1

また、@ TimPetersが観察したように、24と32の理由は、それらが両方とも8で割り切れるからです。一度に2つのアイテムを取る4人の作業者は、イテラブルが4x2で均等に割り切れる必要があることを意味します。 – dano

+0

右!ちょうど 'chunksize = 7'が元の例でも働いていることに注目して、4つのプロセスのそれぞれに7つの項目を適用します。 –

関連する問題