2017-01-17 28 views
0

私はデータフレームを含むリストを持っています。ループ内で私は、このリストのクリーンアップの上にリスト内の各データフレームを反復処理し、別のリストにダンプし、そのリストを返す:データフレーム付きのPython並列処理リスト

allDfs = [] 
def processDfs(self): 

    for df in listOfDfs(): 
     for column_name in need_to_change_column_name: 
      ...# some column name changes 
     df.set_index('id', inplace=True) 

     ## dropping any na 
     df = df.dropna() 
     ... 

     df['cost'] = df['cost'].astype('float64') 

     allDfs.append(df) 

    return allDfs 

にはどうすれば複数のスレッド間でlistOfDfs内の各データフレームの処理を分散しますか?それを収集し、プロセスdfsのリストを返す。

+1

:あなたはすぐにallDfsを必要としない場合

あなたの代わりに最後の行にpool.map_asyncを使用することができます(並列処理はその作業を行いながら、いくつかの他のものを計算する先に行くさせていただきます)これは並列化によってパンダを加速する伝統的な方法でしたが、最近では[無料](dask.pydata.org/en/latest/)は、あなたが「無料で」実現しようとするものすべてを解決します。このライブラリをプロジェクトで使用できるパッケージに追加できる場合は、このライブラリを使用することを検討する必要があります。 – Boud

答えて

1

マルチプロセッシングモジュールを使用します。

from multiprocessing import Pool 

# enter the desired number of processes here 
NUM_PROCS = 8  

def process_single_df(df): 
    """ 
    Function that processes a single df. 
    """ 
    for column_name in need_to_change_column_name: 
     # some column name changes 
     ... 

    df.set_index('id', inplace=True) 

    ## dropping any na 
    df = df.dropna() 
    ... 

    df['cost'] = df['cost'].astype('float64') 

    return df 

pool = Pool(processes=NUM_PROCS) 

allDfs = pool.map(process_single_df, listOfDfs) 

呼び出しがpool.mapし、それはプログラムを続行する前に完了するために、すべてのプロセスを待ちます意味、ブロックしています。私が知っている

# get async result instead (non-blocking) 
async_result = pool.map_async(process_single_df, listOfDfs) 
# do other stuff 
... 
# ok, now I need allDfs so will call async_result.get 
allDfs = async_result.get()