以下の操作(Dask DataFrame APIドキュメントから適合)では、スケジューラにアタッチしないと)、操作は正常に正常に完了します。Dask Distributedは、compute()中に要求された操作を渡していないように見えます
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
connection_loc = 'foobar.net:8786'
# client = Client(connection_loc)
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]})
ddf = dd.from_pandas(df, npartitions=2)
foo = ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()
同じ行がコメント解除され、クライアント接続が割り当てられた瞬間、次のエラーが発生します。TypeError: unorderable types: list() >= int()
は(多くのための完全なトレースバックを参照してください)。
トレースバックを調べると、逆シリアル化しようとしているバイト列が、逆シリアル化しようとしているはずのものではないことがわかります(フルトレースバックdistributed.protocol.pickle - INFO - Failed to deserialize
の1行目を参照)。
ワーカーとスケジューラの両方を実行しているリモートコンテナを完全に停止して再起動しました。私も運がないclient.restart()
を使用しました。なぜこの他のタスクがワーカーに渡され、このエラーがスローされたのか?これをやめるためにDaskを得るための解決策はありますか?
完全トレースバック:
dask_worker_1 | distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95+\x01\x00\x00\x00\x00\x00\x00(\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_fill_function\x94\x93\x94(h\x00\x8c\x0f_make_skel_func\x94\x93\x94h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x02KCC\x0e|\x00j\x00d\x01\x83\x01j\x01\x83\x00S\x00\x94NK\x02\x86\x94\x8c\x07rolling\x94\x8c\x03sum\x94\x86\x94\x8c\x02df\x94\x85\x94\x8c\x1fdask_method/dask_dist_matrix.py\x94\x8c\x08<lambda>\x94K\rC\x00\x94))t\x94R\x94]\x94}\x94\x87\x94R\x94}\x94N}\x94tRN\x8c3('from_pandas-ddc065084280667dd51853b144bdd4e8', 0)\x94NK\x02K\x00)}\x94t\x94."
dask_worker_1 | Traceback (most recent call last):
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1 | return pickle.loads(x)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1 | if cell_count >= 0 else
dask_worker_1 | TypeError: unorderable types: list() >= int()
dask_worker_1 | distributed.worker - WARNING - Could not deserialize task
dask_worker_1 | Traceback (most recent call last):
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 1113, in add_task
dask_worker_1 | self.tasks[key] = _deserialize(function, args, kwargs, task)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 573, in _deserialize
dask_worker_1 | args = pickle.loads(args)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
dask_worker_1 | return pickle.loads(x)
dask_worker_1 | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func
dask_worker_1 | if cell_count >= 0 else
dask_worker_1 | TypeError: unorderable types: list() >= int()
DASK:分散0.15.0 :1.17.1 OS:Ubuntuの16.04.2 LTS私はあなたが労働者の間であなたのcloudpickleバージョンに不一致があることを疑う
「client.get_versions(check = True)」は同じエラーを作成しました。 – kuanb
ああ、理想的には、どこのバージョンが不一致であるかを示す有益なエラーを返すでしょう。私はこのメソッド自体が機能するcloudpickleに依存していると思います。分散ソフトウェア環境は難しい。私が知っている大規模な展開の大部分は、すべてを同期させて管理するのに役立つものを使用しています。 – MRocklin
ありがとうございます。可能であれば、私は他に2つの質問があります。私は彼らが関連しているかもしれないのでここに彼らに尋ねる。一貫性のない依存関係を解決すると、get_versions( 'distributed.utils - ERROR - ストリームがクローズされています:リモートメソッド 'broadcast''を呼び出そうとしている間に)でトルネードエラーが発生しました。 get_versionsの実行をスキップすると、 'distributed.utils - ERROR - ("( 'apply-29db5629d323ed627f7f91b2363edb30'、0) "、 'tcp://10.0.0.248:39689')というエラーが表示されます。 – kuanb