2016-04-14 10 views
1

Daskの分散データフレームでMatthew Rocklin's postから作業しています。私はクラスタ全体でいくつかの集計統計計算を配布しようとしています。 dcluster ...でクラスタを設定すると問題はありません。ノートブックの中で、どのようにNFSファイルにdask + distributedを使用しますか?

import dask.dataframe as dd 
from distributed import Executor, progress 
e = Executor('...:8786') 

df = dd.read_csv(...) 

私が読んでいるファイルは、すべてのワーカーマシンがアクセスできるNFSマウント上にあります。この時点で私は例えばdf.head()を見ることができ、すべてが正しいように見えます。ブログの記事から、私は私がこれを行うことができるはずだと思う:

df_future = e.persist(df) 
progress(df_future) 
# ... wait for everything to load ... 
df_future.head() 

をしかし、それは誤りです:

--------------------------------------------------------------------------- 
AttributeError       Traceback (most recent call last) 
<ipython-input-26-8d59adace8bf> in <module>() 
----> 1 fraudf.head() 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py in head(self, n, compute) 
    358 
    359   if compute: 
--> 360    result = result.compute() 
    361   return result 
    362 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs) 
    35 
    36  def compute(self, **kwargs): 
---> 37   return compute(self, **kwargs)[0] 
    38 
    39  @classmethod 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs) 
    108     for opt, val in groups.items()]) 
    109  keys = [var._keys() for var in variables] 
--> 110  results = get(dsk, keys, **kwargs) 
    111 
    112  results_iter = iter(results) 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs) 
    55  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    56       cache=cache, queue=queue, get_id=_thread_get_id, 
---> 57       **kwargs) 
    58 
    59  return results 

/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs) 
    479     _execute_task(task, data) # Re-execute locally 
    480    else: 
--> 481     raise(remote_exception(res, tb)) 
    482   state['cache'][key] = res 
    483   finish_task(dsk, key, state, results, keyorder.get) 

AttributeError: 'Future' object has no attribute 'head' 

Traceback 
--------- 
    File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task 
    result = _execute_task(task, data) 
    File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task 
    return func(*args2) 
    File "/work/analytics2/analytics/python/envs/analytics/lib/python3.5/site-packages/dask/dataframe/core.py", line 354, in <lambda> 
    dsk = {(name, 0): (lambda x, n: x.head(n=n), (self._name, 0), n)} 

それは通常のファイルから来るとき、データフレームを配布する権利のアプローチは何システムの代わりにHDFS?

答えて

1

Daskはシングルマシンスケジューラを使用しようとしています。これは、通常のdaskライブラリを使用してデータフレームを作成した場合のデフォルトです。デフォルトのクラスタを使用するには、次の行を使用します。

import dask 
dask.set_options(get=e.get) 
関連する問題