2016-10-27 6 views
4

私はPythonのmultiprocessingモジュールと共有メモリで遊んでいます。 Processで共有メモリオブジェクトを使用できますが、Poolでは使用できません。 Poolのコールバックを追加しましたが、コールバックが呼び出されないようです。プールを使用して共有メモリオブジェクトを変更できません

from multiprocessing import Array, Pool, Process 

def flip(x,a): 
    a[x] = 0 if a[x] else 1 
    return (x, a[x]) 

def cb(result): 
    print(result) 

if __name__ == '__main__': 

    # size of array 
    N = 10 

    # shared array - N bytes - unsynchronized - initialized to zeros 
    a = Array('B', N, lock=False) 

    # flip values to ones using Process 
    processes = [Process(target=flip, args=(x, a)) for x in range(N)] 
    for p in processes: p.start() 
    for p in processes: p.join() 
    print([a[i] for i in range(N)])  

    # flip values back to zeros using Pool 
    pool = Pool(processes=4) 
    for x in range(N): 
     pool.apply_async(flip, args=(x, a), callback=cb) 
    pool.close() 
    pool.join() 
    print([a[i] for i in range(N)]) 

私は私の共有配列はすべて1の、単一callbackによって印刷された線とすべて0で再び配列が続くと、一度印刷し得ることを期待し、代わりにこれを得るだろう。

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] 

Poolがタスクを実行していないのはなぜですか?

最小限の例のために、共有メモリの取り出し。

def f(x): 
    return x 

def cb(result): 
    print('cb',result) 

if __name__ == '__main__': 

    pool = Pool(processes=4) 
    pool.apply_async(f, range(10), callback=cb) 
    pool.close() 
    pool.join() 

これは、0から9までの数字を別々の行に出力すると思いますが、何も出力しません。

すぐに上記のapply_syncコールを置き換えると、

pool.apply_async(f, args=[10], callback=cb) 

私は出力range(10)[10][1,2,3][(1),(2),(3)]、または([1],[2],[3])何も出力が得られないの交換

cb 10 

を取得します。

+1

xの範囲(N):pool.apply .... flipを4回適用していますか? 1 - 0 - 1 - 0 - 1(1で終わり) – chapelo

+0

@chapelo - 4はプール内の作業者の数です。 'のための'はそれらのために10のタスクを作成する必要があります。 'cb'は決して呼び出されないので、どのタスクも実行されていないようです。 – CAB

+0

プールは自動的にワーカー間の作業を分割しますが、間違ってargsを渡しています – Aaron

答えて

0

multiprocessingの使用を考慮すると、通常、データは非常に大きくなります。 Nサイズの配列に対してN個のプロセスを実行したように、各データに1つのプロセスを割り当てるのは意味がありません。各プロセスは、アレイのチャンクを処理する)

1:

は、これら二つのアプローチを考えます。 flip_many()およびpartition()

2)各データはプールワーカーにマップされています。 flip_one()

残りのコードは元のコードに非常に近いです。

from multiprocessing import Array, Pool, Process 

def flip_many(start_idx, end_idx): 
    for idx in range(start_idx, end_idx + 1): 
     a[idx] = not(a[idx]) 

def flip_one(idx): 
    a[idx] = not(a[idx]) 
    return idx, a[idx] 

def cb(result): 
    print(result) 

def partition(range_, n): 
    start, end = range_ 
    size = (end - start) // n 
    ranges = [] 
    for _ in range(n): 
     ranges.append((start, start+size-1)) 
     start += size 
    if ranges[-1][1] != end-1: 
     ranges[-1] = (ranges[-1][0], end-1) 
    return ranges  

if __name__ == '__main__': 

    # size of array 
    N = 10 
    N_procs = 2 
    ranges = partition((0, N), N_procs) 

    # shared array - N bytes - unsynchronized - initialized to zeros 
    a = Array('B', N, lock=False) 
    print([a[i] for i in range(N)], "elements of array initialized to 0")  

    # flip values to ones using Process 

    processes = [] 
    for i in range(N_procs): 
     p = Process(target=flip_many, args=(*ranges[i],)) 
     processes.append(p) 
     p.start() 

    for p in processes: 
     p.join() 

    print([a[i] for i in range(N)], "First flip by N processes, should be 1")  

    # flip values back to zeros using Pool 
    pool = Pool() 
    indices = range(N) 
    pool.map(flip_one, indices) 
    print([a[i] for i in range(N)], "Second flip by the pool.map ... 0") 

    pool.map(flip_one, indices, chunksize=N // N_procs) 
    print([a[i] for i in range(N)], "Third flip by the pool.map ... 1") 

    pool.map_async(flip_one, indices, callback=cb) 
    print([a[i] for i in range(N)], "Fourth flip by the pool.map_async ... 0") 
    print(" Due to the async nature, flip not reflected until .join()") 
    print(" But the callback returns the correct results:") 

    pool.close() 
    pool.join() 
    print([a[i] for i in range(N)], "Content after the join... 0") 
関連する問題