2016-10-25 8 views
0

おはようございます、Dask - 大量のメモリを使用しているRechunkまたはArrayスライシング?

私は、Dask処理チェーンで過度の(またはそうでないかもしれない)メモリ使用量を理解するのに助けを求めていました。

master_arrayは(この例では703、57600001ポイント)メモリに保持するのには大きすぎる Dask Arrayある
def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

問題は、次の関数の実行から来ています。

最小限の例として、以下のコンテキストでコードを入れて

import dask.array as da 
import numpy as np 

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

# Fabricate an input array of the same shape and size as the problematic dataset 
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372)) 

# Execute the create_fft_arrays function 
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5) 

以下の完全なコードと同じメモリ使用量が発生し、次のコードの実行が最大に私のRAM(の20Gb)を引き起こします最後の行fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)実行するときにアウト:この後

import dask.array as da 

import h5py as h5 
import numpy as np 

import os 

FORMAT = '.h5' 
DSET_PATH = '/DAS/Data' 
TSET_PATH = '/DAS/Time' 

FFT_SIZE = 2**15 
OVERLAP = 0.5 

input_dir = r'D:\' 
file_paths = [] 

# Get list of all valid files in directory 
for dir_name, sub_dir, f_name in os.walk(input_dir): 
    for f in f_name: 
     if f[-1*len(FORMAT):] == FORMAT: 
      file_paths.append(os.path.join(dir_name, f)) 

#H5 object for each file 
file_handles = [h5.File(f_path, 'r') for f_path in file_paths] 

# Handle for dataset and timestamps from each file 
dset_handles = [f[DSET_PATH] for f in file_handles] 
tset_handles = [f[TSET_PATH] for f in file_handles] 

# Create a Dask Array object for each dataset and timestamp set 
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles] 
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles] 

# Concatenate all datasets along along the time axis 
master_array = da.concatenate(dset_arrays, axis = 1) 

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

# Break master_array into FFT sized arrays with a single chunk in each 
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5) 

を、私はこれらのFFTアレイのそれぞれの周波数応答を計算するda.fft.fftメソッドを使用するために行くだろう。それはに良いですので

任意のヘルプや提案がいただければ幸いです

ジョージ

+0

[mcve](http://stackoverflow.com/help/mcve)を作成することができれば、より早くより良い応答を得ることができます。 – MRocklin

+0

改訂されました。うまくいけば、少しはっきりしているかもしれません。 –

+0

これは、行単位で見ると、大量のメモリを使用している行のようです。fft_arrays = [reshape_array [x:x + FFT_SIZE] for x in fft_index]。 –

答えて

0

あなたのマスター配列は、非常に多くのチャンク

>>> master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372)) 
>>> master_array.npartitions 
154839 

各チャンクにはいくつかの管理オーバーヘッドがありますがありますその数をこれよりもいくらか小さくしてください。これはsection on chunksからdask.array documentation

この配列に何千回もスライスしようとするとボトルネックになります。

チャンクサイズを大きくすると、問題が多少解決する可能性があります。上記にリンクされた文書は、いくつかの推奨事項を提供します。

+0

アドバイスをいただきありがとうございます。 –

+0

アドバイスをいただきありがとうございます。私はそれがh5チャンクサイズであるので、そのチャンクサイズで始めました。最終的にディスクからのデータIOがボトルネックになりますが、このワークフロースケールをEC2インスタンス上で実行するようにDaskを試してみたかったのです。 'master_array'のチャンクサイズを上げると、少なくともグラフを作成することができます。助けてくれてありがとう。 –

関連する問題