私の関数を並列に配布する際に問題があります。Python Multiprocessing - 難解なユースケース、引数の受け渡しを含む
問題文:私は座標ペアのリストが2つあります。dfC
とdfO
です。 dfC
の各obsについては、r
の中にどれくらい多くのdfO
が入るかを数えています。 私は現在働いている機能を持っていますが、私はこれを並行して処理できるかどうか確認しようとしています。
これはキャッチです:dfC
は分割して個別に処理できますが、dfO
は各ワーカーで100%にする必要があります。私のアプローチは、これを最初に並行させてみましょう。次に、dfO
のフルコピーをどのようにして労働者に配布するかについて心配します。誰かが私が両方に取り組むのを助けることができないかぎり?
まず、ここまでのすべてを設定するコードです:ここで
import pandas as pd
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, process
import traceback
from scipy.spatial import cKDTree
# create 2 dataframes with random "coordinates"
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy'))
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk'))
が
dfC
が
dfO
が同様
+----+----+
| x | y |
+----+----+
| 35 | 5 |
+----+----+
| 96 | 18 |
+----+----+
| 23 | 25 |
+----+----+
| 20 | 7 |
+----+----+
| 74 | 54 |
+----+----+
次を見ていきます、どのように見えるかの例ですが、ここのように働く機能ですチャーム。別々にすべての引数を渡すのではなく、私は実際にこのようにしています。つまり、これらを並列に呼び出すための主要な関数を準備することです(そうでなければ、マルチプロセッシングの方法を見つけることができませんでした)。
# this function works on dfC, and adds a row which counts the number
# of objects in dfO which are within radius r
def worker_job(args):
try:
dfC, dfO, newcol, r = args
mxC=dfC.as_matrix()
mxO = dfO.as_matrix()
# magic tree stuff
C_Tree = cKDTree(mxC)
O_Tree = cKDTree(mxO)
listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0)
counts=[]
for i in listoflists:
counts.append(len(i))
s = pd.Series(counts)
dfC[newcol] = s.values
except:
raise
traceback.print_exc()
else:
return dfC
私はこのように私の引数を作成する場合:私は自分自身でそれを実行したときに args=[dfC,dfO,"new_column_name",3]
それは完璧に動作します: worker_job(args)
+----+----+-----------------+
| x | y | new_column_name |
+----+----+-----------------+
| 35 | 5 | 4 |
+----+----+-----------------+
| 96 | 18 | 1 |
+----+----+-----------------+
| 23 | 25 | 0 |
+----+----+-----------------+
| 20 | 7 | 1 |
+----+----+-----------------+
| 74 | 54 | 2 |
+----+----+-----------------+
を今、私がしようとする関数を構築します並列作業者を制御し、この作業を並行して実行します。ここに私のベストエフォートです:
# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
try:
pool = Pool(Num_Proc)
parts = pool.map(Function,args)
pool.close()
pool.join()
results_df = pd.concat(parts)
except:
pool.close()
pool.terminate()
traceback.print_exc()
else:
return results_df
これは動作しません。 Run_Parallel(worker_job,2,args)
は約ValueError: not enough values to unpack (expected 4, got 2)
というエラーをスローします。ラッパーを通過するときに何かが起こっているはずです。
大きな問題を解決する方法を知っている人のために、私はこのエラーの指針を特に探しています。プールにはdfO
の100%とdfC
のサブセットが含まれている必要があります効率の
。 'args'リストを別のリストの中に入れ、それを' map'関数に渡さなければなりません。 'worker_job'関数を直接呼び出す以外の方法ではないことに気付くかもしれません。あなたのプログラムを再構成する必要があります。 – Himal