2017-11-15 22 views
2

daskを使用して大規模な高解像度海洋モデルのデータセットで経時的な線形傾向を計算しようとしています。daskパフォーマンスは軸に沿って適用されます

私はこの例(Applying a function along an axis of a dask array)に従っており、apply_along_axisの構文がわかりやすくなっています。

現在、を使用して1次元配列にnumpy関数をラップし、結果のdask配列をxarray Dataarrayにパッケージします。 top -u <username>を使用すると、計算が並行して実行されないことが示唆されます(〜100%cpu use)。

map_blocksより良いパフォーマンスが期待できますか?または、apply_along_axisのパフォーマンスを改善する方法に関する提案はありますか? どのようなヒントも高く評価されています。

import numpy as np 
from scipy import optimize 
import xarray as xr 
import dask.array as dsa 

def _lin_trend(y): 
    x = np.arange(len(y)) 
    return np.polyfit(x, y, 1) 



def linear_trend(da, dim, name='parameter'): 
    da = da.copy() 
    axis_num = da.get_axis_num(dim) 

    dims = list(da.dims) 
    dims[axis_num] = name 
    coords = da.rename({dim:name}).coords 
    coords[name] = ['slope', 'intercept'] 

    dsk = da.data 
    dsk_trend = dsa.apply_along_axis(_lin_trend,0,dsk) 
    out = xr.DataArray(dsk_trend, dims=dims, coords=coords) 
    return out 
+0

あなたのdaskチャンクの構成とデータアレイの形状を共有できますか? – jhamman

答えて

0

私はxarrayのapply_ufunc(後xarray v0.10以上が必要です)を使用して、類似した何かをやっています。これは、daskでapply_along_axis関数を使用するよりも、管理が少し容易になる可能性があります。

import xarray as xr 
import numpy as np 
from scipy import stats 

def _calc_slope(x, y): 
    '''wrapper that returns the slop from a linear regression fit of x and y''' 
    slope = stats.linregress(x, y)[0] # extract slope only 
    return slope 


def linear_trend(obj): 
    time_nums = xr.DataArray(obj['time'].values.astype(np.float), 
          dims='time', 
          coords={'time': obj['time']}, 
          name='time_nums') 
    trend = xr.apply_ufunc(_calc_slope, time_nums, obj, 
          vectorize=True, 
          input_core_dims=[['time'], ['time']], 
          output_core_dims=[[]], 
          output_dtypes=[np.float], 
          dask='parallelized') 

    return trend 

パフォーマンスが期待どおりでない理由について質問します。これは、いくつかの理由からである可能性があります。あなたのdask配列はどのようにチャンクされていますか?どのdaskスケジューラを使用していますか?私はあなたの設定がより良いアイデアを得た後、私の答えの2番目の部分を更新しますか?

1

最終的にパフォーマンスは私が取り組んでいるファイルシステムによって制限されていると思います。

<xarray.Dataset> 
Dimensions:   (st_edges_ocean: 51, st_ocean: 50, time: 101, xt_ocean: 3600, yt_ocean: 2700) 
Coordinates: 
    * xt_ocean  (xt_ocean) float64 -279.9 -279.8 -279.7 -279.6 -279.5 ... 
    * yt_ocean  (yt_ocean) float64 -81.11 -81.07 -81.02 -80.98 -80.94 ... 
    * st_ocean  (st_ocean) float64 5.034 15.1 25.22 35.36 45.58 55.85 ... 
    * st_edges_ocean (st_edges_ocean) float64 0.0 10.07 20.16 30.29 40.47 ... 
    * time   (time) float64 3.634e+04 3.671e+04 3.707e+04 3.744e+04 ... 

だから、それはかなり大きく、ディスクから読み取るのに長い時間を必要とします。しかし、あなたの質問に答えるために、私のデータセットは、以下の形状を有しています。時間ディメンションは、パフォーマンスのために大きな違いはありませんでした単一のチャンク

dask.array<concatenate, shape=(101, 50, 2700, 3600), dtype=float64, 
chunksize=(101, 1, 270, 3600)> 

あるように私は読んへの書き込みを含めている(それがまだ終了する機能のために約20時間かかります(それをrechunkedていますディスク)。私は現在、唯一の私は両方の方法の相対的なパフォーマンスに興味があったと私のラップトップ上でテストを実行した例

dask.array<concatenate, shape=(101, 50, 2700, 3600), dtype=float64, 
chunksize=(1, 1, 2700, 3600)> 

、時間にチャンクしています。

import xarray as xr 
import numpy as np 
from scipy import stats 
import dask.array as dsa 

slope = 10 
intercept = 5 
t = np.arange(250) 
x = np.arange(10) 
y = np.arange(500) 
z = np.arange(200) 
chunks = {'x':10, 'y':10} 

noise = np.random.random([len(x), len(y), len(z), len(t)]) 
ones = np.ones_like(noise) 
time = ones*t 
data = (time*slope+intercept)+noise 
da = xr.DataArray(data, dims=['x', 'y', 'z', 't'], 
       coords={'x':('x', x), 
         'y':('y', y), 
         'z':('z', z), 
         't':('t', t)}) 
da = da.chunk(chunks) 
da 

ノーw timeseriesの勾配を計算するためにlinregressとpolyfitの両方を使用するプライベート関数のセットと、dask.apply_alongとxarray.apply_ufuncを使用する異なる実装を定義しました。計算タイミング

def _calc_slope_poly(y): 
    """ufunc to be used by linear_trend""" 
    x = np.arange(len(y)) 
    return np.polyfit(x, y, 1)[0] 


def _calc_slope(y): 
    '''returns the slop from a linear regression fit of x and y''' 
    x = np.arange(len(y)) 
    return stats.linregress(x, y)[0] 

def linear_trend_along(da, dim): 
    """computes linear trend over 'dim' from the da. 
     Slope and intercept of the least square fit are added to a new 
     DataArray which has the dimension 'name' instead of 'dim', containing 
     slope and intercept for each gridpoint 
    """ 
    da = da.copy() 
    axis_num = da.get_axis_num(dim) 
    trend = dsa.apply_along_axis(_calc_slope, axis_num, da.data) 
    return trend 

def linear_trend_ufunc(obj, dim): 
    trend = xr.apply_ufunc(_calc_slope, obj, 
          vectorize=True, 
          input_core_dims=[[dim]], 
          output_core_dims=[[]], 
          output_dtypes=[np.float], 
          dask='parallelized') 

    return trend 

def linear_trend_ufunc_poly(obj, dim): 
    trend = xr.apply_ufunc(_calc_slope_poly, obj, 
          vectorize=True, 
          input_core_dims=[[dim]], 
          output_core_dims=[[]], 
          output_dtypes=[np.float], 
          dask='parallelized') 

    return trend 

def linear_trend_along_poly(da, dim): 
    """computes linear trend over 'dim' from the da. 
     Slope and intercept of the least square fit are added to a new 
     DataArray which has the dimension 'name' instead of 'dim', containing 
     slope and intercept for each gridpoint 
    """ 
    da = da.copy() 
    axis_num = da.get_axis_num(dim) 
    trend = dsa.apply_along_axis(_calc_slope_poly, axis_num, da.data) 
    return trend 

trend_ufunc = linear_trend_ufunc(da, 't') 
trend_ufunc_poly = linear_trend_ufunc_poly(da, 't') 
trend_along = linear_trend_along(da, 't') 
trend_along_poly = linear_trend_along_poly(da, 't') 

apply_along方法がわずかに速いかもしれないことを示していると思われます。 linregressの代わりにpolyfitを使用するとかなり大きな影響を与えるようです。なぜこれがはるかに速いのか分かりませんが、これはあなたにとって興味深いかもしれません。ループ当たり180ミリ秒±

%%timeit 
print(trend_ufunc[1,1,1].data.compute()) 

4.89 S(STD平均±。DEV。7つのラン、1つのループ毎の)

%%timeit 
trend_ufunc_poly[1,1,1].compute() 

ループ当たり182ミリ秒±2.74秒(STD平均±。DEV。7つのラン、1つのループずつ)ループ当たり193ミリ秒±

%%timeit 
trend_along[1,1,1].compute() 

4.58秒(STD。DEV平均±7つのラン、1つのループずつ)

%%timeit 
trend_along_poly[1,1,1].compute() 

2.64秒±65ミリ秒あたりのループ(平均±標準誤差は7回、ループは1回)

関連する問題