2017-06-22 1 views
1

私は2つのグループを持っています.1つはグループとして処理される行があり、もう1つはグループになります。マルチプロセッシンググループがPythonを適用する

現在、私はこの上でマルチプロセッシング行う方法を知りませんが、グループのすべての行に

gr1.apply(lambda x: custom_func(x.Address1, gr2.get_group(x.name))) 

を私の関数を適用しています

test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']}) 
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']}) 

gr1 = test.groupby('city') 
gr2 = test2.groupby('city') 

。お知らせ下さい。

EDIT: - daskを使用しようとしましたが、データフレーム全体をdaskの関数に渡すことはできません - apply機能には制限があります。そして私はgr1(グループ)にdaskを適用しようとしましたが、カスタム関数でインデックスを設定しているので、 "インデックスが多すぎます"というエラーがスローされます。

ここDASKと、これは私にエラーを与える - 'Pandas' object has no attribute 'city'

ddf1 = dd.from_pandas(test, 2) 
ddf2 = dd.from_pandas(test2, 2) 

dgr1 = ddf1.groupby('city') 
dgr2 = ddf2.groupby('city') 

meta = pd.DataFrame(columns=['Address1', 'score', 'idx','source_index']) 
ddf1.map_partitions(custom_func, x.Address1, dgr2.get_group(x.city).Address1,meta=meta).compute() 
+0

「dask」を見ると、それはpandasとよく統合されています。 – suvy

+0

うん、それを見たが、daskは、適用機能でデータフレームを渡すことをサポートしていない。 2つ目は、グループにdaskを適用しようとしたとき、私はcustom_funcの中にインデックスを設定しようとしているので、「インデクサが多すぎます」と失敗します。 –

+0

dask applyは、列を使用してmap_partitionを使用する場合は、行を有効にする必要があります。あなたが試したこととエラーが報告されたあなたの質問を編集するのはクールですか? – suvy

答えて

2

私はここにDASKを使用する代わりにソリューションを提供し、

import pandas as pd 
from multiprocessing import Pool 
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']}) 
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']}) 

test=test.assign(dataset = 'test') 
test2=test2.assign(dataset = 'test2') 

newdf=pd.concat([test2,test],keys = ['test2','test']) 
gpd=newdf.groupby('city') 
def my_func(mygrp): 
    test_data=mygrp.loc['test'] 
    test2_data=mygrp.loc['test2'] 
    #do something specific 
    #if needed print something 
    return {'Address':test2_data.Address1.values[0],'ID':test2_data.ID.values[0]} #return some other stuff 

mypool=Pool(processes=2) 
ret_list=mypool.imap(my_func,(group for name, group in gpd)) 

pd.DataFrame(ret_list) 

返します

ID address 
0 3 234 kookie Pl 
1 1 123 chese wy 
2 8 456 Pretzel Junktion 
3 4 345 Pizzza DR 

のようなものをPS:OPの質問では、2つの同様のデータセットが特殊な関数で比較されます。 ses pandas.concat。問題に応じてpd.mergeを想像することもできます。

+0

ねえ。ご協力いただきありがとうございます。このメモリエラーが発生し続ける。メインののファイル "script.py"ファイルret_list = mypool.map(my_func、)は、 "script.py"ファイル内の "script.py" (名前のグループ、gpdのグループ)) ファイル "/home/ubuntu/anaconda2/lib/python2.7/multiprocessing/pool.py"、ライン251、マップ return self.map_async(func、iterable、chunksize) .get() ファイル "/home/ubuntu/anaconda2/lib/python2.7/multiprocessing/pool.py"、行567、取得先 raise self._value MemoryError' –

+0

私は32GBのRAMと256GBのディスクを持っていますが私が思うに、pool.mapはデータ全体をすべてのサブプロセスにコピーし、メモリの問題を引き起こしていると思います。親プロセスのデータは必要ありません。私は何を渡すからt。どのように達成するのですか? –

+0

Yaaマップは集中的なイテレータのリストを作成するので、代わりにmypool.imapを使用してください。 – suvy

関連する問題