1
dask(async)フレームワークを使用して単純なタスク(インスタンスメソッド)を実行しようとしていますが、シリアル化エラーで失敗します。daskを使用してタスクをサブミットするときにピクルエラーが発生する
誰かが私を正しい方向に向けることができますか?ここで
は、私が実行しているコードです:
from dask.distributed import Client, as_completed
import time
class DaskConnect:
def __init__(self):
print("Initialized:",self.__class__.__name__)
self.scheduler_host="192.168.0.4"
self.scheduler_port="8786"
def connect(self):
self.client = Client(self.scheduler_host+":"+self.scheduler_port)
# self.client = Client()
return self.client
def disconnect(self):
self.client.close()
class TestDask:
def __init__(self):
print("Initialized:",self.__class__.__name__)
self.dask_client=DaskConnect().connect()
def do_task(self,msg):
time.sleep(30)
return msg
def run(self):
tasks=[1]
# tasks = [1, 2, 3, 4, 5]
futures=[]
for task in tasks:
print("Submitting:",task)
future = self.dask_client.submit(self.do_task, "Task:"+str(task))
futures.append(future)
for future in as_completed(futures):
result = future.result()
print("Result",result)
TestDask().run()
エラー:
distributed.protocol.pickle - INFO - Failed to serialize main.TestDask object at 0x101c408d0>>. Exception: can't pickle select.kqueue objects Traceback (most recent call last):
右方向を指すための@MRに感謝します。関数実行時にクライアントをインスタンス変数からローカル変数に変更すると、問題が解決しました。 –