2017-10-20 2 views
0

フロート番号のストリームを読み込み、簡単な計算を行い、その値をグローバルリストに追加します。私はどこが間違っているか教えていただけますか?リストは追加されません。Daskでグローバルリストを更新する

from random import random 
from time import sleep 

def process(x): 
    from random import random 
    sleep(random()*2) 
    t = x * 2 
    processed_queue.append(t) 
    print(processed_queue) 
    return t 

if __name__ == "__main__": 

    from distributed import Client 
    from queue import Queue 

    client = Client() 

    processed_queue = [] 
    input_q = Queue() 

    remote_q = client.scatter(input_q) 
    processed_q = client.map(process, remote_q) 
    result_q = client.gather(processed_q) 

    for i in [random() for x in range(100)]: 
     sleep(random()) 
     input_q.put(i) 
     print(i) 
     print(processed_queue) 
     print(result_q.qsize()) 

答えて

0

Whilest queue.Queuemultiprocessing.Queue は、スレッドおよびプロセス、プログラミング・バイ・副作用の一般この種類ではなくDASKによって促進モデルとの間でデータを送信するために使用することができます。

クラスタで実行された関数にデータを渡して、client.submitを使用してリアルタイムで戻り値を取得することができます。さらに、これを行う可能性のある共有変数のようないくつかのdask構造がありますが、(まったく)使用されることはほとんどなく、正しいパラダイムはないと思います。

Client()は、スケジューラ用に少なくとも1つの別個のプロセスを作成し、1つ以上のスレッドを持つワーカー用に作成します(タスクマネージャ、トップ、見ているツール)。 queue.Queueはプロセスローカルなので、各プロセスは空のキューを参照して追加しますが、その情報はメインプロセスには表示されず、入力キューのアクションはワーカーには表示されません。

関連する問題