2017-03-02 15 views
0

私は列を持つpandas DataFrameを取得しました。画像はnumpyの2D配列として格納されています。pandasをプロデュースするDataFrameのnumpy.arraysシリーズをdaskと並行して

ヒストグラムを持つシリーズまたはDataFrameを、同じ列にもう一度、daskと並行して配置する必要があります。

サンプルコード:

import numpy as np 
import pandas as pd 
import dask.dataframe as dd 

def func(data): 
    result = np.histogram(data.image.ravel(), bins=128)[0] 
    return result 


n = 10 
df = pd.DataFrame({'image': [(np.random.random((60, 24)) * 255).astype(np.uint8) for i in np.arange(n)], 
       'n1': np.arange(n), 
       'n2': np.arange(n) * 2, 
       'n3': np.arange(n) * 4 
       } 
      ) 
print 'DataFrame\n', df 

hists = pd.Series([func(r[1]) for r in df.iterrows()]) 

# MAX_PROCESSORS = 4 
# ddf = dd.from_pandas(df, npartitions=MAX_PROCESSORS) 
# hists = ddf.apply(func, axis=1, meta=pd.Series(name='data', dtype=np.ndarray)).compute() 

print 'Histograms \n', hists 

所望の出力

DataFrame 
               image n1 n2 n3 
0 [[51, 254, 167, 61, 230, 135, 40, 194, 101, 24... 0 0 0 
1 [[178, 130, 204, 196, 80, 97, 61, 51, 195, 38,... 1 2 4 
2 [[122, 126, 47, 31, 208, 130, 85, 189, 57, 227... 2 4 8 
3 [[185, 141, 206, 233, 9, 157, 152, 128, 129, 1... 3 6 12 
4 [[131, 6, 95, 23, 31, 182, 42, 136, 46, 118, 2... 4 8 16 
5 [[111, 89, 173, 139, 42, 131, 7, 9, 160, 130, ... 5 10 20 
6 [[197, 223, 15, 40, 30, 210, 145, 182, 74, 203... 6 12 24 
7 [[161, 87, 44, 198, 195, 153, 16, 195, 100, 22... 7 14 28 
8 [[0, 158, 60, 217, 164, 109, 136, 237, 49, 25,... 8 16 32 
9 [[222, 64, 64, 37, 142, 124, 173, 234, 88, 40,... 9 18 36 
Histograms 
0 [81, 87, 80, 94, 99, 79, 86, 90, 90, 113, 96, ... 
1 [93, 76, 103, 83, 76, 101, 85, 83, 96, 92, 87,... 
2 [84, 93, 87, 113, 83, 83, 89, 89, 114, 92, 86,... 
3 [98, 101, 95, 111, 77, 92, 106, 72, 91, 100, 9... 
4 [95, 96, 87, 82, 89, 87, 99, 82, 70, 93, 76, 9... 
5 [77, 94, 95, 85, 82, 90, 77, 92, 87, 89, 94, 7... 
6 [73, 86, 81, 91, 91, 82, 96, 94, 112, 95, 74, ... 
7 [88, 89, 87, 88, 76, 95, 96, 98, 108, 96, 92, ... 
8 [83, 84, 76, 88, 96, 112, 89, 80, 93, 94, 98, ... 
9 [91, 78, 85, 98, 105, 75, 83, 66, 79, 86, 109,... 

あなたはdask.DataFrame.applyを呼び出すと、コメント行を見ることができます。私はそれらをコメント解除した場合、私は例外にdask.async.ValueError: Shape of passed values is (3, 128), indices imply (3, 4)

を持って、ここでは例外スタックできました:

File "C:\Anaconda\envs\MBA\lib\site-packages\dask\base.py", line 94, in compute 
    (result,) = compute(self, traverse=False, **kwargs) 
    File "C:\Anaconda\envs\MBA\lib\site-packages\dask\base.py", line 201, in compute 
    results = get(dsk, keys, **kwargs) 
    File "C:\Anaconda\envs\MBA\lib\site-packages\dask\threaded.py", line 76, in get 
    **kwargs) 
    File "C:\Anaconda\envs\MBA\lib\site-packages\dask\async.py", line 500, in get_async 
    raise(remote_exception(res, tb)) 
dask.async.ValueError: Shape of passed values is (3, 128), indices imply (3, 4) 

は、どのように私はそれを克服することができますか?

私の目標は、このデータフレームを並行して処理することです。

+0

を包むリストの内包表記やジェネレータ式に比べて2-4倍のスピードアップできます?これは、データフレームを使用するよりも簡単です。 – MRocklin

+0

@MRocklin:まだありません。明日(今日は8.30 PMです) – wl2776

+0

これはうまくいきます: 'dhists = delayed([dunc.iterrows()]のrに対してdelayed([delayed(func、pure = True)(r [1])); hists = pd.Series(dhists.compute(get = dask.multiprocessing.get)) ' しかし、データフレームの反復にはまだ時間がかかります。 – wl2776

答えて

0

map_partitionsです。数日間の実験と時間測定の後、私は次のコードに行きました。それはあなたが[dask.delayed](http://dask.pydata.org/en/latest/delayed.html)を使用して考えがありpandas.DataFrame.itertuples

def func(data): 
    filtered = # filter data.image 
    result = np.histogram(filtered) 
    return result 


def func_partition(data, additional_args): 
    result = data.apply(func, args=(bsifilter,), axis=1) 
    return result 

if __name__ == '__main__': 
    dask.set_options(get=dask.multiprocessing.get) 
    n = 30000 
    df = pd.DataFrame({'image': [(np.random.random((180, 64)) * 255).astype(np.uint8) for i in np.arange(n)], 
        'n1': np.arange(n), 
        'n2': np.arange(n) * 2, 
        'n3': np.arange(n) * 4 
        } 
       ) 


    ddf = dd.from_pandas(df, npartitions=MAX_PROCESSORS) 
    dhists = ddf.map_partitions(func_partition, bfilter, meta=pd.Series(dtype=np.ndarray)) 
    print 'Delayed dhists = \n', dhists 
    hists = pd.Series(dhists.compute()) 
関連する問題