2017-08-31 9 views
1

dask(async)フレームワークを使用して単純なタスク(インスタンスメソッド)を実行しようとしていますが、シリアル化エラーで失敗します。daskを使用してタスクをサブミットするときにピクルエラーが発生する

誰かが私を正しい方向に向けることができますか?ここで

は、私が実行しているコードです:

from dask.distributed import Client, as_completed 
import time 

class DaskConnect: 

def __init__(self): 
    print("Initialized:",self.__class__.__name__) 
    self.scheduler_host="192.168.0.4" 
    self.scheduler_port="8786" 

def connect(self): 
    self.client = Client(self.scheduler_host+":"+self.scheduler_port) 
    # self.client = Client() 
    return self.client 

def disconnect(self): 
    self.client.close() 

class TestDask: 
def __init__(self): 
    print("Initialized:",self.__class__.__name__) 
    self.dask_client=DaskConnect().connect() 

def do_task(self,msg): 
    time.sleep(30) 
    return msg 

def run(self): 
    tasks=[1] 
    # tasks = [1, 2, 3, 4, 5] 
    futures=[] 
    for task in tasks: 
     print("Submitting:",task) 
     future = self.dask_client.submit(self.do_task, "Task:"+str(task)) 
     futures.append(future) 

    for future in as_completed(futures): 
     result = future.result() 
     print("Result",result) 

TestDask().run() 

エラー:

distributed.protocol.pickle - INFO - Failed to serialize main.TestDask object at 0x101c408d0>>. Exception: can't pickle select.kqueue objects Traceback (most recent call last):

答えて

1

DASKクライアントが現在直列化可能ではありません。 Dask Clientを含むオブジェクトもすべて直列化できません。一般に、アクティブなネットワーク接続、ロックなどを含むものをシリアル化することは難しいです。

おそらく別の方法がありますか?

+0

右方向を指すための@MRに感謝します。関数実行時にクライアントをインスタンス変数からローカル変数に変更すると、問題が解決しました。 –

関連する問題