2017-07-20 6 views
1

私の関数を並列に配布する際に問題があります。Python Multiprocessing - 難解なユースケース、引数の受け渡しを含む

問題文:私は座標ペアのリストが2つあります。dfCdfOです。 dfCの各obsについては、rの中にどれくらい多くのdfOが入るかを数えています。 私は現在働いている機能を持っていますが、私はこれを並行して処理できるかどうか確認しようとしています。

これはキャッチです:dfCは分割して個別に処理できますが、dfOは各ワーカーで100%にする必要があります。私のアプローチは、これを最初に並行させてみましょう。次に、dfOのフルコピーをどのようにして労働者に配布するかについて心配します。誰かが私が両方に取り組むのを助けることができないかぎり?

まず、ここまでのすべてを設定するコードです:ここで

import pandas as pd 
import numpy as np 
import multiprocessing as mp 
from multiprocessing import Pool, process 
import traceback 
from scipy.spatial import cKDTree 

# create 2 dataframes with random "coordinates" 
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy')) 
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk')) 

dfCdfOが同様

+----+----+ 
| x | y | 
+----+----+ 
| 35 | 5 | 
+----+----+ 
| 96 | 18 | 
+----+----+ 
| 23 | 25 | 
+----+----+ 
| 20 | 7 | 
+----+----+ 
| 74 | 54 | 
+----+----+ 

次を見ていきます、どのように見えるかの例ですが、ここのように働く機能ですチャーム。別々にすべての引数を渡すのではなく、私は実際にこのようにしています。つまり、これらを並列に呼び出すための主要な関数を準備することです(そうでなければ、マルチプロセッシングの方法を見つけることができませんでした)。

# this function works on dfC, and adds a row which counts the number 
# of objects in dfO which are within radius r 
def worker_job(args): 
    try: 
     dfC, dfO, newcol, r = args 

     mxC=dfC.as_matrix() 
     mxO = dfO.as_matrix() 

     # magic tree stuff 
     C_Tree = cKDTree(mxC) 
     O_Tree = cKDTree(mxO) 

     listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0) 

     counts=[] 
     for i in listoflists: 
      counts.append(len(i)) 

     s = pd.Series(counts) 

     dfC[newcol] = s.values 

    except: 
     raise 
     traceback.print_exc() 
    else: 
     return dfC 

私はこのように私の引数を作成する場合:私は自分自身でそれを実行したときに args=[dfC,dfO,"new_column_name",3]

それは完璧に動作します: worker_job(args)

+----+----+-----------------+ 
| x | y | new_column_name | 
+----+----+-----------------+ 
| 35 | 5 |  4  | 
+----+----+-----------------+ 
| 96 | 18 |  1  | 
+----+----+-----------------+ 
| 23 | 25 |  0  | 
+----+----+-----------------+ 
| 20 | 7 |  1  | 
+----+----+-----------------+ 
| 74 | 54 |  2  | 
+----+----+-----------------+ 

を今、私がしようとする関数を構築します並列作業者を制御し、この作業を並行して実行します。ここに私のベストエフォートです:

# this function should control the multiprocessing 
def Run_Parallel(Function, Num_Proc, args): 
    try: 
     pool = Pool(Num_Proc) 
     parts = pool.map(Function,args) 
     pool.close() 
     pool.join() 

     results_df = pd.concat(parts) 

    except: 
     pool.close() 
     pool.terminate() 
     traceback.print_exc() 
    else: 
     return results_df 

これは動作しません。 Run_Parallel(worker_job,2,args)は約ValueError: not enough values to unpack (expected 4, got 2)というエラーをスローします。ラッパーを通過するときに何かが起こっているはずです。

大きな問題を解決する方法を知っている人のために、私はこのエラーの指針を特に探しています。プールにはdfOの100%とdfCのサブセットが含まれている必要があります効率の

+1

。 'args'リストを別のリストの中に入れ、それを' map'関数に渡さなければなりません。 'worker_job'関数を直接呼び出す以外の方法ではないことに気付くかもしれません。あなたのプログラムを再構成する必要があります。 – Himal

答えて

1

答えは、引数をリストのリストとして渡すことでした。これはまた、データフレームを分割するという別の問題を解決しました(私はプールがデフォルトでこれを処理したと思っていましたが、そうではありません)。

正しい関数は次のようになります。 `Pool.map`は` iterable`を期待

# this function should control the multiprocessing 
def Run_Parallel(Function, Num_Proc, args): 
    dfC, dfO, newcol, r = args 

    # to make lists of lists 
    argslist=[] 
    dfOlist=[] 
    dfClist=[] 
    resultlist=[] 

    # split dfC into parts 
    Cparts=np.array_split(dfC, Num_Proc) 

    # build the lists 
    for i in range(Num_Proc): 
     argslist.append([Cparts[i],dfO,newcol,r]) 


    try: 
     pool = Pool(Num_Proc) 
     parts = pool.map(Function,argslist) 
     pool.close() 
     pool.join() 

     results_df = pd.concat(parts) 

    except: 
     pool.close() 
     pool.terminate() 
     traceback.print_exc() 
    else: 
     return results_df 
関連する問題