2017-08-07 17 views

答えて

7

最も簡単な方法はDask's map_partitionsです。

import pandas as pd 
import dask.dataframe as dd 
from dask.multiprocessing import get 

をし、構文は

data = <your_pandas_dataframe> 
ddata = dd.from_pandas(data, npartitions=30) 

def myfunc(x,y,z, ...): return <whatever> 

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)).compute(get=get) 

(私はあなたが16個のコアを持っている場合、30パーティションの適切な数であると信じている)である:あなたは(あなたがpip install daskにする必要があります)これらの輸入を必要としています。

data = pd.DataFrame() 
data['col1'] = np.random.normal(size = 1500000) 
data['col2'] = np.random.normal(size = 1500000) 

ddata = dd.from_pandas(data, npartitions=30) 
def myfunc(x,y): return y*(x**2+1) 
def apply_myfunc_to_DF(df): return df.apply((lambda row: myfunc(*row)), axis=1) 
def pandas_apply(): return apply_myfunc_to_DF(data) 
def dask_apply(): return ddata.map_partitions(apply_myfunc_to_DF).compute(get=get) 
def vectorized(): return myfunc(data['col1'], data['col2'] ) 

t_pds = timeit.Timer(lambda: pandas_apply()) 
print(t_pds.timeit(number=1)) 

28.16970546543598

t_dsk = timeit.Timer(lambda: dask_apply()) 
print(t_dsk.timeit(number=1)) 

2.708152851089835

t_vec = timeit.Timer(lambda: vectorized()) 
print(t_vec.timeit(number=1)) 
:ちょうど完全を期すために、私は私のマシン上の違い(16コア)を時限はパンダから行く10のスピードアップの要因 を与える

0.010668013244867325

はパーティションに適用DASKに適用されます。もちろん、ベクトル化できる関数がある場合は、関数(y*(x**2+1))は単純にベクトル化されますが、ベクトル化することは不可能なことがたくさんあります。

+2

投稿していただきありがとうございます。 30パーティションを選んだ理由を説明できますか?この値を変更するとパフォーマンスが変わりますか? –

+0

@AndrewL私は、各パーティションが別々のプロセスで処理されていると仮定し、16個のコアでは16または32個のプロセスを同時に実行できると仮定します。 私はそれを試してみましたが、パフォーマンスは32パーティションまで改善されていますが、それ以上の増加は有益な効果はありません。私は、クアッドコアのマシンでは8つのパーティションなどが必要であると仮定しています。 16と32の間に改善が見られましたので、本当に2倍の$ NUM_PROCESSORS –

関連する問題