2017-12-06 10 views
2

groupbyの結果である新しい列を作成し、データフレームの順序を維持しながら(または少なくともそれを元に戻すことができるように)別の列を適用したいと思います。dask groupby適用してからデータフレームにマージする

例:私はグループ

import dask 
import numpy as np 
import pandas as pd 
from dask import dataframe 

def normalize(x): 
    return ((x - x.mean())/x.std()) 


data = np.vstack([np.arange(2000), np.random.random(2000), np.round(np.linspace(0, 10, 2000))]).T 
df = dataframe.from_array(data, columns=['index', 'signal', 'id_group'], chunksize=100) 
df = df.set_index('index') 

normalized_signal = df.groupby('id_group').signal.apply(normalize, meta=pd.Series(name='normalized_signal_by_group')) 
normalized_signal.compute() 

によって信号列を正規化したい 私は右のシリーズを得るかが、インデックスがシャッフルされます。 私はこのシリーズをデータフレームに戻しますか?

私は

df['normalized_signal'] = normalized_signal 
df.compute() 

を試してみましたが、私はまた、マージを試みたが、私の最後のデータフレームは、インデックス

沿っ頼るする簡単な方法とシャッフル終わる

ValueError: Not all divisions are known, can't align partitions. Please use set_index to set the index.


を取得

df2 = df.merge(normalized_signal.to_frame(), left_index=True, right_index=True, how='left') 
df2.compute() 

パンダでsort_index()よりも系列を計算するときに機能しますが、効率的ではありません。

df3 = df.merge(normalized_signal.to_frame().compute().sort_index(), left_index=True, right_index=True, how='left') 
df3.compute() 

同等のパンダの方法は次のとおりです。

df4 = df.compute() 
df4['normalized_signal_by_group'] = df4.groupby('id_group').signal.transform(normalize) 
df4 
+0

"遅延"を使用してデータベースのグループ化されたチャンクをクエリすると、 "遅延から"を使用するのが効率的なソリューションになりますか? – AlexFC

答えて

1

は残念ながらtransformがまだDASKに実装されていません。

import numpy as np 
import pandas as pd 
import dask.dataframe as dd 

pd.options.mode.chained_assignment = None 

def normalize(x): 
    return ((x - x.mean())/x.std()) 

def dask_norm(gp): 
    gp["norm_signal"] = normalize(gp["signal"].values) 
    return(gp.as_matrix()) 

data = np.vstack([np.arange(2000), np.random.random(2000), np.round(np.linspace(0, 10, 2000))]).T 
df = dd.from_array(data, columns=['index', 'signal', 'id_group'], chunksize=100) 
df1 = df.groupby("id_group").apply(dask_norm, meta=pd.Series(name="a")) 
df2 = df1.to_frame().compute() 
df3 = pd.concat([pd.DataFrame(a) for a in df2.a.values]) 
df3.columns = ["index", "signal", "id_group", "normalized_signal_by_group"] 
df3.sort_values("index", inplace=True) 
関連する問題