2016-12-29 1 views
1

大規模なデータベースを処理する必要のあるメソッドがあります。 を掘るのに数時間かかります。引数は(最大)Xの処理対象の(長い)リストに格納されています1バッチ。メソッドは何も返す必要はありませんが、私は "fun"の "True"を返します。python apply_asyncはメソッドを呼び出さない

私はそれを線形的に反復しているとき、関数は完全に動作していますここで)、apply_asyncまたはmap_asyncの作業を行うことができません。 (以前は他のプロジェクトでも働いていました) 私は何が間違っているのか、何らかのヒントがあれば感謝します。 以下のコードを参照してください:

import multiprocessing as mp 

    class mainClass: 
     #loads of stuff 

    def main(): 
     multiprocess = True 
     batchSize = 35 

     mC = mainClass() 
     while True: 
      toCheck = [key for key, value in mC.lCheckSet.items()] #the tasks are stored in a dictionary, I'm referring to them with their keys, which I turn to a list here for iteration. 
      if multiprocess == False: 
       #this version works perfectly fine 
       for i in toCheck[:batchSize]: 
        mC.check(i) 
      else: 
       #the async version does not, either with apply_async... 
       with mp.Pool(processes = 8) as pool: 
        temp = [pool.apply_async(mC.check, args=(toCheck[n],)) for n in range(len(toCheck[:batchSize]))] 
        results = [t.get() for t in temp] 

       #...or as map_async 
       pool = mp.Pool(processes = 8) 
       temp = pool.map_async(mC.check, toCheck[:batchSize]) 
       pool.close() 
       pool.join() 

    if __name__=="__main__": 
     main() 

答えて

0

「香り」ここには、あなたが一度だけ、メインプロセスであなたのmaincClassをインスタンス化し、別のプロセスにその上にメソッドを呼び出ししようとしているということです - しかし、注意したときにすることをプロセスプールにmC.checkを渡します。これは、このプロセスでインスタンス化されたクラスにすでにバインドされているメソッドです。

あなたの問題がどこにあるのでしょうか。それはおそらく仕事ができるが - そしてそれがない - 私は、この簡易版を作り、それが意図したとおりに動作します:

import multiprocessing as mp 
import random, time 

class MainClass: 
    def __init__(self): 
     self.value = 1 
    def check(self, arg): 
     time.sleep(random.uniform(0.01, 0.3)) 
     print(id(self),self.value, arg) 

def main(): 
    mc = MainClass() 

    with mp.Pool(processes = 4) as pool: 
     temp = [pool.apply_async(mc.check, (i,)) for i in range(8)] 
     results = [t.get() for t in temp] 

main() 

(あなただけの方法がまったく実行されていないことを確認するためにいくつかprint Sを追加しようとしている?) ですから、MainClassのいくつかの複雑な状態では、並列プロセスにはうまくいきません。 MultiProcessingでcurrent_processを取得し、このオブジェクトをネームスペースとして使用して、プロセス内のデータをワーカープールでインスタンス化し、さまざまな呼び出しで保持できるため、各プロセス内でメインクラスをインスタンス化することができます非同期を適用します。だから、

、1つの怒鳴るように新しいcheck関数を作成 - そしてその代わりmainprocessであなたのmainclassをインスタンス化する、プール内の各プロセス内でそれをインスタンス化:

import multiprocessing as mp 
import random, time 

def check(arg): 
    process = mp.current_process 
    if not hasattr(process, "main_class"): 
     process.main_class = MainClass() 
    process.main_class.check(arg) 


class MainClass: 
    def __init__(self): 
     self.value = random.randrange(100) 
    def check(self, arg): 
     time.sleep(random.uniform(0.01, 0.3)) 
     print(id(self),self.value, arg) 

def main(): 
    mc = MainClass() 

    with mp.Pool(processes = 2) as pool: 
     temp = [pool.apply_async(check, (i,)) for i in range(8)] 
     results = [t.get() for t in temp] 

main() 
+0

あなたはjsbuenoありがとう!いいえ、印刷物も実行されません。 mCには、check関数によって操作されるべき巨大なリストがいくつかあります。私はそれが彼らに何らかの変更を加えないことに気づいたので、あなたのヒント(彼らは同じインスタンスに接続されていません)が正しい方向かもしれません。同じ理由で、再インスタンス化でも残念ながら解決できません。 私の現在の回避策は、mCのテーブルを直接操作するのではなく、結果をフィードバックするようにcheck()を書き直すことです。できます。 しかし、もし私がそれらをmCのリストを直接操作できるようにするにはどうすればいいのかまだ分かりません。 – Adam

+0

続き:最初の例のように、チェックでテーブル(self.value = 1)を読み取ることができますが、それを書きたいと思います。それは何らかの理由で発生しません。 – Adam

関連する問題