2017-02-28 3 views
4

DaskおよびDistributedを使用して多くの成功を収め、データ解析パイプラインを開発しています。しかし、私がまだ改善を楽しみにしていることの1つは、例外を処理する方法です。Daskで例外を処理する方法

今私は、次の

def my_function (value): 
    return 1/value 

results = (dask.bag 
    .from_sequence(range(-10, 10)) 
    .map(my_function)) 

print(results.compute()) 

を書き、場合...その後、私はトレースバックの長い、長いリスト取得プログラム実行中に(労働者に1つずつ、私は推測しています)。ここで

distributed.utils - ERROR - division by zero 
Traceback (most recent call last): 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f 
    result[0] = yield gen.maybe_future(func(*args, **kwargs)) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run 
    value = future.result() 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result 
    raise_exc_info(self._exc_info) 
    File "<string>", line 3, in raise_exc_info 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run 
    yielded = self.gen.throw(*exc_info) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in _get 
    result = yield self._gather(packed) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run 
    value = future.result() 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result 
    raise_exc_info(self._exc_info) 
    File "<string>", line 3, in raise_exc_info 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run 
    yielded = self.gen.throw(*exc_info) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 923, in _gather 
    st.traceback) 
    File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/six.py", line 685, in reraise 
    raise value.with_traceback(tb) 
    File "/mnt/lustrefs/work/aurelien.mazurie/test_dask/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/dask/bag/core.py", line 1411, in reify 
    File "test.py", line 9, in my_function 
    return 1/value 
ZeroDivisionError: division by zero 

が最も関連するセグメントは、当然のことながら、目視検査では誤差がゼロで番号を除算していることを私に教えてくれます。私はこれらのエラーを追跡するためのより良い方法があるのだろうと思っています。例えば、私は例外自体をキャッチすることができるように見えることはできません。

import dask.bag 
import distributed 

try: 
    dask_scheduler = "127.0.0.1:8786" 
    dask_client = distributed.Client(dask_scheduler) 

    def my_function (value): 
     return 1/value 

    results = (dask.bag 
     .from_sequence(range(-10, 10)) 
     .map(my_function)) 

    #dask_client.persist(results) 

    print(results.compute()) 

except Exception as e: 
    print("error: %s" % e) 

EDIT:私の例では、私はだけではなく、DASKを分散使用していますので注意してください。 dask-schedulerがポート8786でリッスンしており、4つのdask-workerプロセスが登録されています。

このコードは、上記と全く同じ出力を生成します。つまり、実際にはtry/exceptブロックで例外をキャッチしていません。

ここでは、クラスタ全体で分散タスクを話しているので、明らかに例外を私に伝えていくことは自明ではありません。そうするためのガイドラインはありますか?今私の解決策は、結果とオプションのエラーメッセージの両方を返し、その後、個別の結果やエラーメッセージを処理する機能を持つことです。

def my_function (value): 
    try: 
     return {"result": 1/value, "error": None} 
    except ZeroDivisionError: 
     return {"result": None, "error": "boom!"} 

results = (dask.bag 
    .from_sequence(range(-10, 10)) 
    .map(my_function)) 

dask_client.persist(results) 

errors = (results 
    .pluck("error") 
    .filter(lambda x: x is not None) 
    .compute()) 

print(errors) 

results = (results 
    .pluck("result") 
    .filter(lambda x: x is not None) 
    .compute()) 

print(results) 

これは動作しますが、私はここにsandblasting the soup crackerだ場合、私は思ったんだけど。 EDIT:もう一つの選択肢は、Maybeモナドのようなものを使うことですが、もう一度私はそれを考えすぎているかどうかを知りたいと思います。

答えて

0

Daskは、リモートで発生した例外を自動的にパッケージ化し、ローカルで再検証します。あなたの事例を実行すると、私は何を得るのですか

In [1]: from dask.distributed import Client 

In [2]: client = Client('localhost:8786') 

In [3]: import dask.bag 

In [4]: try: 
    ...:  def my_function (value): 
    ...:   return 1/value 
    ...: 
    ...:  results = (dask.bag 
    ...:   .from_sequence(range(-10, 10)) 
    ...:   .map(my_function)) 
    ...: 
    ...:  print(results.compute()) 
    ...: 
    ...: except Exception as e: 
    ...:  import pdb; pdb.set_trace() 
    ...:  print("error: %s" % e) 
    ...:  
distributed.utils - ERROR - division by zero 
> <ipython-input-4-17aa5fbfb732>(13)<module>() 
-> print("error: %s" % e) 
(Pdb) pp e 
ZeroDivisionError('division by zero',) 
+0

これは興味深いものです。これは私が 'distributed'を使用していて、' Client'オブジェクトを使用して私の仕事を提出しているという(鍵の)違いがある、まったく同じコードを実行する動作ではありません。例外のリレイニングの動作は** Dask **のみで処理されますが、** distributed **では処理されないことがありますか?追加情報として、ワーカーはリモートの計算クラスタ上にあります。 – ajmazurie

+0

分散スケジューラーへの接続を含む答えを更新しました。結果は同じです。 – MRocklin

関連する問題