2017-07-06 19 views
1

私はdaskを使い始めようとしています。下のおもちゃの例では、私は3つの列、site,countsおよびreadingsを持っています。 siteおよびcountsはスカラーの列であり、readingsは3次元配列を含んでいます。Daskを使ってこの "入れ子になった"構造化配列の計算を実行するにはどうすればよいですか?

countsで計算を実行できますが、readingsで実行しようとすると例外が発生します。私はここで正しくdaskを使用していますか?

import dask.array as da 
import numpy as np 
import tables 

dtype = np.dtype([ 
    ('site', 'S1'), 
    ('counts', np.int8), 
    ('readings', np.float64, (2, 2, 3)) 
]) 

with tables.open_file('test.hdf5', 'w') as f: 
    sensors = f.create_table('/', 'sensors', description=dtype) 
    rows = [(site, count, np.random.rand(2, 2, 3)) 
      for count, site in enumerate('abcdefghij')] 
    sensors.append(rows) 
    sensors.flush() 

    # Operating on 'counts' column works fine... 
    x = da.from_array(f.root.sensors.cols.counts, chunks=5) 
    x_add = (x + 1).compute() 

    # But on 'readings' does not 
    y = da.from_array(f.root.sensors.cols.readings, chunks=5) 
    y_add = (y + 1).compute() 

(y + 1).compute()では、次の例外があります。 (それは非常に有用ではありませんので、一番下にある実際のエラーは、pytablesでTypeError例外のエラー文字列を構築するからのようです。)

TypeError         Traceback (most recent call last) 
<ipython-input-115-77c7e132695c> in <module>() 
    22  # But on readings column does not 
    23  y = da.from_array(f.root.sensors.cols.readings, chunks=5) 
---> 24  y_add = (y + 1).compute() 

~/miniconda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) 
    95    Extra keywords to forward to the scheduler ``get`` function. 
    96   """ 
---> 97   (result,) = compute(self, traverse=False, **kwargs) 
    98   return result 
    99 

~/miniconda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) 
    202  dsk = collections_to_dsk(variables, optimize_graph, **kwargs) 
    203  keys = [var._keys() for var in variables] 
--> 204  results = get(dsk, keys, **kwargs) 
    205 
    206  results_iter = iter(results) 

~/miniconda/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs) 
    73  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    74       cache=cache, get_id=_thread_get_id, 
---> 75       pack_exception=pack_exception, **kwargs) 
    76 
    77  # Cleanup pools associated to dead threads 

~/miniconda/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 
    519       _execute_task(task, data) # Re-execute locally 
    520      else: 
--> 521       raise_exception(exc, tb) 
    522     res, worker_id = loads(res_info) 
    523     state['cache'][key] = res 

~/miniconda/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb) 
    58   if exc.__traceback__ is not tb: 
    59    raise exc.with_traceback(tb) 
---> 60   raise exc 
    61 
    62 else: 

~/miniconda/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 
    288  try: 
    289   task, data = loads(task_info) 
--> 290   result = _execute_task(task, data) 
    291   id = get_id() 
    292   result = dumps((result, id)) 

~/miniconda/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk) 
    269   func, args = arg[0], arg[1:] 
    270   args2 = [_execute_task(a, cache) for a in args] 
--> 271   return func(*args2) 
    272  elif not ishashable(arg): 
    273   return arg 

~/miniconda/lib/python3.6/site-packages/dask/array/core.py in getarray(a, b, lock) 
    61   lock.acquire() 
    62  try: 
---> 63   c = a[b] 
    64   if type(c) != np.ndarray: 
    65    c = np.asarray(c) 

~/miniconda/lib/python3.6/site-packages/tables/table.py in __getitem__(self, key) 
    3455   else: 
    3456    raise TypeError(
-> 3457     "'%s' key type is not valid in this context" % key) 
    3458 
    3459  def __iter__(self): 

TypeError: not all arguments converted during string formatting 

そして最後に、x_addの値は上の1であるarray([ 6, 7, 8, 9, 10, 6, 7, 8, 9, 10], dtype=int8)、あります最後のチャンクは2回タイル張りました。私は期待するだろう[1, 2, ..., 10]。繰り返しますが、意図したとおりにdaskを使用しているかどうか疑問です。

+0

同じデータを2回取得する理由は、そのテーブルがスレッド間で状態を共有するため、すべてのワーカーが同じ最終データを参照するということです。これは分散スケジューラを使用すると消えます。 – mdurant

答えて

0

Dask.arrayは、numpyスタイルのスライスに従うように配列を渡します。 PyTablesはこれをサポートしていないようです。

In [12]: f.root.sensors.cols.counts 
Out[12]: /sensors.cols.counts (Column(10,), int8, idx=None) 

In [13]: f.root.sensors.cols.counts[:] 
Out[13]: array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int8) 

In [14]: f.root.sensors.cols.readings 
Out[14]: /sensors.cols.readings (Column(10, 2, 2, 3), float64, idx=None) 

In [15]: f.root.sensors.cols.counts[:, :, :, :] 
--------------------------------------------------------------------------- 
TypeError         Traceback (most recent call last) 
<ipython-input-15-5e59d077075e> in <module>() 
----> 1 f.root.sensors.cols.counts[:, :, :, :] 

/home/mrocklin/Software/anaconda/lib/python3.6/site-packages/tables/table.py in __getitem__(self, key) 
    3453   else: 
    3454    raise TypeError(
-> 3455     "'%s' key type is not valid in this context" % key) 
    3456 
    3457  def __iter__(self): 

TypeError: not all arguments converted during string formatting 

これが機能する場合は、代わりにh5pyを試すことをおすすめします。私は一般的にh5pyがPyTablesよりも少し賢明であることを発見しました。

+0

クイックヘルプありがとう。残念ながら、私は今、他のいくつかの問題に陥っていますので、h5pyに移行することはできませんが、チャンスが訪れたときにそれを見ていきます。 – capitalistcuttle

関連する問題