DaskおよびDistributedを使用して多くの成功を収め、データ解析パイプラインを開発しています。しかし、私がまだ改善を楽しみにしていることの1つは、例外を処理する方法です。Daskで例外を処理する方法
今私は、次の
def my_function (value):
return 1/value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
print(results.compute())
を書き、場合...その後、私はトレースバックの長い、長いリスト取得プログラム実行中に(労働者に1つずつ、私は推測しています)。ここで
distributed.utils - ERROR - division by zero
Traceback (most recent call last):
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
result[0] = yield gen.maybe_future(func(*args, **kwargs))
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in _get
result = yield self._gather(packed)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 923, in _gather
st.traceback)
File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/mnt/lustrefs/work/aurelien.mazurie/test_dask/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/dask/bag/core.py", line 1411, in reify
File "test.py", line 9, in my_function
return 1/value
ZeroDivisionError: division by zero
が最も関連するセグメントは、当然のことながら、目視検査では誤差がゼロで番号を除算していることを私に教えてくれます。私はこれらのエラーを追跡するためのより良い方法があるのだろうと思っています。例えば、私は例外自体をキャッチすることができるように見えることはできません。
import dask.bag
import distributed
try:
dask_scheduler = "127.0.0.1:8786"
dask_client = distributed.Client(dask_scheduler)
def my_function (value):
return 1/value
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
#dask_client.persist(results)
print(results.compute())
except Exception as e:
print("error: %s" % e)
EDIT:私の例では、私はだけではなく、DASK、を分散使用していますので注意してください。 dask-scheduler
がポート8786でリッスンしており、4つのdask-worker
プロセスが登録されています。
このコードは、上記と全く同じ出力を生成します。つまり、実際にはtry
/except
ブロックで例外をキャッチしていません。
ここでは、クラスタ全体で分散タスクを話しているので、明らかに例外を私に伝えていくことは自明ではありません。そうするためのガイドラインはありますか?今私の解決策は、結果とオプションのエラーメッセージの両方を返し、その後、個別の結果やエラーメッセージを処理する機能を持つことです。
def my_function (value):
try:
return {"result": 1/value, "error": None}
except ZeroDivisionError:
return {"result": None, "error": "boom!"}
results = (dask.bag
.from_sequence(range(-10, 10))
.map(my_function))
dask_client.persist(results)
errors = (results
.pluck("error")
.filter(lambda x: x is not None)
.compute())
print(errors)
results = (results
.pluck("result")
.filter(lambda x: x is not None)
.compute())
print(results)
これは動作しますが、私はここにsandblasting the soup crackerだ場合、私は思ったんだけど。 EDIT:もう一つの選択肢は、Maybe
モナドのようなものを使うことですが、もう一度私はそれを考えすぎているかどうかを知りたいと思います。
これは興味深いものです。これは私が 'distributed'を使用していて、' Client'オブジェクトを使用して私の仕事を提出しているという(鍵の)違いがある、まったく同じコードを実行する動作ではありません。例外のリレイニングの動作は** Dask **のみで処理されますが、** distributed **では処理されないことがありますか?追加情報として、ワーカーはリモートの計算クラスタ上にあります。 – ajmazurie
分散スケジューラーへの接続を含む答えを更新しました。結果は同じです。 – MRocklin