2017-06-28 3 views
2

以下の操作(Dask DataFrame APIドキュメントから適合)では、スケジューラにアタッチしないと)、操作は正常に正常に完了します。Dask Distributedは、compute()中に要求された操作を渡していないように見えます

from dask.distributed import Client 
import dask.dataframe as dd 
import pandas as pd 

connection_loc = 'foobar.net:8786' 
# client = Client(connection_loc) 

df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [1., 2., 3., 4., 5.]}) 
ddf = dd.from_pandas(df, npartitions=2) 
foo = ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() 

同じ行がコメント解除され、クライアント接続が割り当てられた瞬間、次のエラーが発生します。TypeError: unorderable types: list() >= int()は(多くのための完全なトレースバックを参照してください)。

トレースバックを調べると、逆シリアル化しようとしているバイト列が、逆シリアル化しようとしているはずのものではないことがわかります(フルトレースバックdistributed.protocol.pickle - INFO - Failed to deserializeの1行目を参照)。

ワーカーとスケジューラの両方を実行しているリモートコンテナを完全に停止して再起動しました。私も運がないclient.restart()を使用しました。なぜこの他のタスクがワーカーに渡され、このエラーがスローされたのか?これをやめるためにDaskを得るための解決策はありますか?

完全トレースバック:

dask_worker_1  | distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95+\x01\x00\x00\x00\x00\x00\x00(\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_fill_function\x94\x93\x94(h\x00\x8c\x0f_make_skel_func\x94\x93\x94h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x02KCC\x0e|\x00j\x00d\x01\x83\x01j\x01\x83\x00S\x00\x94NK\x02\x86\x94\x8c\x07rolling\x94\x8c\x03sum\x94\x86\x94\x8c\x02df\x94\x85\x94\x8c\x1fdask_method/dask_dist_matrix.py\x94\x8c\x08<lambda>\x94K\rC\x00\x94))t\x94R\x94]\x94}\x94\x87\x94R\x94}\x94N}\x94tRN\x8c3('from_pandas-ddc065084280667dd51853b144bdd4e8', 0)\x94NK\x02K\x00)}\x94t\x94." 
dask_worker_1  | Traceback (most recent call last): 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads 
dask_worker_1  |  return pickle.loads(x) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func 
dask_worker_1  |  if cell_count >= 0 else 
dask_worker_1  | TypeError: unorderable types: list() >= int() 
dask_worker_1  | distributed.worker - WARNING - Could not deserialize task 
dask_worker_1  | Traceback (most recent call last): 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 1113, in add_task 
dask_worker_1  |  self.tasks[key] = _deserialize(function, args, kwargs, task) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/worker.py", line 573, in _deserialize 
dask_worker_1  |  args = pickle.loads(args) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads 
dask_worker_1  |  return pickle.loads(x) 
dask_worker_1  | File "/usr/local/lib/python3.5/site-packages/cloudpickle/cloudpickle.py", line 935, in _make_skel_func 
dask_worker_1  |  if cell_count >= 0 else 
dask_worker_1  | TypeError: unorderable types: list() >= int() 

DASK:分散0.15.0 :1.17.1 OS:Ubuntuの16.04.2 LTS私はあなたが労働者の間であなたのcloudpickleバージョンに不一致があることを疑う

答えて

2

クライアント。すべてのワーカーとクライアントが同じソフトウェア設定を持つようにする必要があります。あなたは助けるために、次のコマンドを試すことができます:

client.get_versions(check=True) 

私はこれがdask.distributedバージョン1.17.1でcloudpickleが含まれていると思いますが、後続のすべてのバージョンにすべきではありません。

+0

「client.get_versions(check = True)」は同じエラーを作成しました。 – kuanb

+1

ああ、理想的には、どこのバージョンが不一致であるかを示す有益なエラーを返すでしょう。私はこのメソッド自体が機能するcloudpickleに依存していると思います。分散ソフトウェア環境は難しい。私が知っている大規模な展開の大部分は、すべてを同期させて管理するのに役立つものを使用しています。 – MRocklin

+0

ありがとうございます。可能であれば、私は他に2つの質問があります。私は彼らが関連しているかもしれないのでここに彼らに尋ねる。一貫性のない依存関係を解決すると、get_versions( 'distributed.utils - ERROR - ストリームがクローズされています:リモートメソッド 'broadcast''を呼び出そうとしている間に)でトルネードエラーが発生しました。 get_versionsの実行をスキップすると、 'distributed.utils - ERROR - ("( 'apply-29db5629d323ed627f7f91b2363edb30'、0) "、 'tcp://10.0.0.248:39689')というエラーが表示されます。 – kuanb

0

他の回答にも言及されていますが、これはソフトウェアバージョンの不一致です。私も同じ問題がありました。

私はそれをすべて再び働かせるためにいくつかのことを行いました。私はdask_ec2を使用していたので、ここでそれらの変更を取り上げますが、どのようにクラスタを設定しているのか分かりません。

まず、ubuntu 16.04をローカルで使用していたので、分散サーバに同じバージョンがあれば同じライブラリなどがある可能性が高いと想像しましたが、これには問題がありました(https://github.com/dask/dask-ec2/issues/98参照)。要約:dask_ec2/salt.pyを変更し、__install_salt_rest_apiメソッドでcherrypy==3.2.3をダウンロードしました(詳細についてはリンクされた問題を参照)。

次に、dac_ec2を新しいバージョンのAnacondaを使用するように設定しました。 dask_ec2/formulas/salt/conda/settings.slsでは、にDOWNLOAD_URLラインを変更します。

{% set download_url = 'https://repo.continuum.io/archive/Anaconda2-5.0.1-Linux-x86_64.sh' %} 

{% set download_url = 'https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh' %} 

第三に、私は自分のライブラリを確実にするために、自分のコンピュータ上のアップデートを実行した日時までにあった。例えば

From:Upgrading all packages with pip

pip freeze --local | grep -v '^\-e' | cut -d = -f 1 | xargs -n1 pip install -U 

conda update --all 

私は最終的に全体の多くを再起動し、それがうまく働きました。

関連する問題