2016-05-26 14 views
1

一連の発射:私は遅延タスクを実行することでarray内のすべての要素にアクセスしたいこのDASKコードの場合並列タスク

def inc(x): 
    return x + 1 

for x in range(5): 
    array[x] = delay(inc)(x) 

を。しかし、arrayは機能ではないので、array.compute()に電話することはできません。私は

for x in range(5): 
    array[x].compute() 

を行う場合は、各タスクを並列に実行されますんやa[1] GETしかa[0]終了後に発生しますか?このコードを書く良い方法はありますか?

+0

で多くの遅れ値を計算するdask.compute機能を使用することができますが、私は質問を誤解し、私のコメントを削除します。 –

答えて

0

あなたが長い時間がかかるように強制すると、状況が並行して実行されているかどうかを簡単に知ることができます。このコードを実行する場合:

from time import sleep, time 
from dask import delayed 

start = time() 

def inc(x): 
    sleep(1) 
    print('[inc(%s): %s]' % (x, time() - start)) 
    return x + 1 

array = [0] * 5 
for x in range(5): 
    array[x] = delayed(inc)(x) 

for x in range(5): 
    array[x].compute() 

呼び出しが順番に発生することは非常に明白になります。ただし、最後のループを次のように置き換えた場合は、

delayed(array).compute() 

が表示されます。私のマシンでは、出力は次のようになります。

[inc(1): 1.00373506546] 
[inc(4): 1.00429320335] 
[inc(2): 1.00471806526] 
[inc(3): 1.00475406647] 
[inc(0): 2.00795912743] 

明らかに、実行された最初の4つのタスクは並列でした。おそらく、デフォルトの並列処理はマシン上のコアの数に設定されています。なぜなら、CPUを大量に使用するタスクでは、それ以上の処理を行うのは一般的に有用ではないからです。

+0

@JRRが私の答えを更新しました。より正当な方法のように見えますが、同じテストを実行するので、MRocklinの答えを使用することをお勧めします。 –

1

あなたは一度

from dask import delayed, compute 

array = [delayed(inc)(i) for i in range(5)] 
result = compute(*array) 
関連する問題