2017-08-06 5 views
1

私は機能を持っているが、私はDASK配列を超える適用するsig2zと呼ばれる:`はRuntimeError:リソースdask.array.map_blocksのため一時的にunavailable`()

def sig2z(da, zr, zi, nvar=None, dim=None, coord=None): 
    """ 
    Interpolate variables on \sigma coordinates onto z coordinates. 

    Parameters 
    ---------- 
    da : `dask.array` 
     The data on sigma coordinates to be interpolated 
    zr : `dask.array` 
     The depths corresponding to sigma layers 
    zi : `numpy.array` 
     The depths which to interpolate the data on 
    nvar : str (optional) 
     Name of the variable. Only necessary when the variable is 
     horizontal velocity. 

    Returns 
    ------- 
    dai : `dask.array` 
     The data interpolated onto a spatial uniform z coordinate 
    """ 

    if np.diff(zi)[0] < 0. or zi.max() <= 0.: 
     raise ValueError("The values in `zi` should be postive and increasing.") 
    if np.any(np.absolute(zr[0]) < np.absolute(zr[-1])): 
     raise ValueError("`zr` should have the deepest depth at index 0.") 
    if zr.shape != da.shape[-3:]: 
     raise ValueError("`zr` should have the same " 
         "spatial dimensions as `da`.") 

    if dim == None: 
     dim = da.dims 
    if coord == None: 
     coord = da.coords 
    N = da.shape 
    nzi = len(zi) 
    if len(N) == 4: 
     dai = np.empty((N[0],nzi,N[-2],N[-1])) 
    elif len(N) == 3: 
     dai = np.empty((nzi,N[-2],N[-1])) 
    else: 
     raise ValueError("The data should at least have three dimensions") 
    dai[:] = np.nan 

    zi = -zi[::-1] # ROMS has deepest level at index=0 

    if nvar=='u': # u variables 
     zl = .5*(zr.shift(eta_rho=-1, xi_rho=-1) 
       + zr.shift(eta_rho=-1) 
       ) 
    elif nvar=='v': # v variables 
     zl = .5*(zr.shift(xi_rho=-1) 
       + zr.shift(eta_rho=-1, xi_rho=-1) 
       ) 
    else: 
     zl = zr 

    for i in range(N[-1]): 
     for j in range(N[-2]): 
      # only bother for sufficiently deep regions 
      if zl[:,j,i].min() < -1e2: 
       # only interp on z above topo 
       ind = np.argwhere(zi >= zl[:,j,i].copy().min()) 
       if len(N) == 4: 
        for s in range(N[0]): 
         dai[s,:len(ind),j,i] = _interpolate(da[s,:,j,i].copy(), 
                  zl[:,j,i].copy(), 
                  zi[int(ind[0]):] 
                  ) 
       else: 
        dai[:len(ind),j,i] = _interpolate(da[:,j,i].copy(), 
                 zl[:,j,i].copy(), 
                 zi[int(ind[0]):] 
                ) 

    return xr.DataArray(dai, dims=dim, coords=coord) 

これはxarray.DataArrayで正常に動作しますが、とき私はこのエラーを取得していますなぜ

test = dsar.map_blocks(sig2z, w[0].data, 
         zr.chunk({'eta_rho':1,'xi_rho':1}).data, zi, 
         dim, coord, 
         chunks=dai[0].chunks, dtype=dai.dtype 
        ).compute() 

--------------------------------------------------------------------------- 
RuntimeError        Traceback (most recent call last) 
<ipython-input-29-d81bad2f4486> in <module>() 
----> 1 test.compute() 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs) 
    95    Extra keywords to forward to the scheduler ``get`` function. 
    96   """ 
---> 97   (result,) = compute(self, traverse=False, **kwargs) 
    98   return result 
    99 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs) 
    202  dsk = collections_to_dsk(variables, optimize_graph, **kwargs) 
    203  keys = [var._keys() for var in variables] 
--> 204  results = get(dsk, keys, **kwargs) 
    205 
    206  results_iter = iter(results) 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs) 
    73  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    74       cache=cache, get_id=_thread_get_id, 
---> 75       pack_exception=pack_exception, **kwargs) 
    76 
    77  # Cleanup pools associated to dead threads 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 
    519       _execute_task(task, data) # Re-execute locally 
    520      else: 
--> 521       raise_exception(exc, tb) 
    522     res, worker_id = loads(res_info) 
    523     state['cache'][key] = res 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb) 
    58   if exc.__traceback__ is not tb: 
    59    raise exc.with_traceback(tb) 
---> 60   raise exc 
    61 
    62 else: 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 
    288  try: 
    289   task, data = loads(task_info) 
--> 290   result = _execute_task(task, data) 
    291   id = get_id() 
    292   result = dumps((result, id)) 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk) 
    269   func, args = arg[0], arg[1:] 
    270   args2 = [_execute_task(a, cache) for a in args] 
--> 271   return func(*args2) 
    272  elif not ishashable(arg): 
    273   return arg 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/dask/array/core.py in getarray(a, b, lock) 
    63   c = a[b] 
    64   if type(c) != np.ndarray: 
---> 65    c = np.asarray(c) 
    66  finally: 
    67   if lock: 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    425 
    426  def __array__(self, dtype=None): 
--> 427   self._ensure_cached() 
    428   return np.asarray(self.array, dtype=dtype) 
    429 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in _ensure_cached(self) 
    422  def _ensure_cached(self): 
    423   if not isinstance(self.array, np.ndarray): 
--> 424    self.array = np.asarray(self.array) 
    425 
    426  def __array__(self, dtype=None): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    406 
    407  def __array__(self, dtype=None): 
--> 408   return np.asarray(self.array, dtype=dtype) 
    409 
    410  def __getitem__(self, key): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    373  def __array__(self, dtype=None): 
    374   array = orthogonally_indexable(self.array) 
--> 375   return np.asarray(array[self.key], dtype=None) 
    376 
    377  def __getitem__(self, key): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order) 
    529 
    530  """ 
--> 531  return array(a, dtype, copy=False, order=order) 
    532 
    533 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/core/indexing.py in __array__(self, dtype) 
    373  def __array__(self, dtype=None): 
    374   array = orthogonally_indexable(self.array) 
--> 375   return np.asarray(array[self.key], dtype=None) 
    376 
    377  def __getitem__(self, key): 

/home/takaya/.conda/envs/arab/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in __getitem__(self, key) 
    58   with self.datastore.ensure_open(autoclose=True): 
    59    try: 
---> 60     data = getitem(self.get_array(), key) 
    61    except IndexError: 
    62     # Catch IndexError in netCDF4 and return a more informative 

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:39743)() 

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:49835)() 

RuntimeError: Resource temporarily unavailable 

誰かが私に教えてくださいでした:私は、私は次のエラーを取得する、それがdask.arrayに適用されますか?前もって感謝します。

+0

可能な重複からの回答(https://stackoverflow.com/questions/22725165/resource-temporarily-unavailable-error-with-subprocess-module -in-python) – Veltro

答えて

0

pid番号、オープンファイル記述子、メモリは制限されたリソースです。

fork(2) manual sayserrno.EAGAINが起こるべきとき:

 
[EAGAIN] The system-imposed limit on the total number of processes under 
      execution would be exceeded. This limit is configuration-dependent. 

[EAGAIN] The system-imposed limit MAXUPRC() on the total number of processes 
      under execution by a single user would be exceeded. 

がより簡単にエラーを再現するために、あなたはあなたのプログラムの開始時に追加することができます。

import resource 

resource.setrlimit(resource.RLIMIT_NPROC, (20, 20)) 

問題は、すべての子プロセスであることであるかもしれませんp.stdin.close()を呼び出していないため、gnuplotのstdinがパイプにリダイレクトされたときに完全にバッファリングされている可能性があります。つまり、プロセスが入力待ちの状態になっている可能性があります。そして/またはあなたのアプリケーションでは、ファイル記述子が多すぎます(ファイル記述子はPython 2.7ではデフォルトで子プロセスによって継承されます)。

入力はその後.communicate()を使用し、出力に依存せず、入力はサイズに制限されている場合:

from subprocess import Popen, PIPE, STDOUT 

p = Popen("gnuplot", stdin=PIPE, stdout=PIPE, stderr=PIPE, 
      close_fds=True, # to avoid running out of file descriptors 
      bufsize=-1, # fully buffered (use zero (default) if no p.communicate()) 
      universal_newlines=True) # translate newlines, encode/decode text 
out, err = p.communicate("\n".join(['set terminal gif;', contents])) 

.communicate()は、すべての入力を書き込み、すべての出力を読み取り(並行して、そのデッドロックはありません)を閉じp.stdin、p.stdout、p.stderr(入力が小さく、gnuplotの側が完全にバッファリングされていても、EOFがバッファをフラッシュしていても)、プロセスが完了するまで待っています(ゾンビはありません)。 polls exit status of all known subprocessesつまり、あなたがp.wait()を呼び出すない場合でも(デッドしかし、未読状態で)多くのゾンビプロセスがあってはならない、そのコンストラクタで

Popen通話_cleanup()

[Pythonでサブプロセスモジュールとリソース一時的に利用できないエラー]のhttps://stackoverflow.com/a/22729602/4879665

+0

これは残念ながら私の問題には役立ちません。 – roxyboy

関連する問題