2016-05-15 18 views
1

私はかなりマルチプロセッシングが新しく、以下のスクリプトを書いていますが、メソッドが呼び出されていません。私は何が欠けているのか分かりません。非同期Pythonでのマルチプロセッシングでワーカー関数が呼び出されない

  1. コール二つの異なる方法:私は何をしたいか

    は次のとおりです。

  2. 他の方法の前に1つのメソッドを呼び出します。

    # import all necessary modules 
        import Queue 
        import logging 
        import multiprocessing 
        import time, sys 
        import signal 
    
        debug = True 
    
        def init_worker(): 
         signal.signal(signal.SIGINT, signal.SIG_IGN) 
    
        research_name_id = {} 
        ids = [55, 125, 428, 429, 430, 895, 572, 126, 833, 502, 404] 
        # declare all the static variables 
        num_threads = 2 # number of parallel threads 
    
        minDelay = 3 # minimum delay 
        maxDelay = 7 # maximum delay 
    
        # declare an empty queue which will hold the publication ids 
        queue = Queue.Queue(0) 
    
    
        proxies = [] 
        #print (proxies) 
    
        def split(a, n): 
         """Function to split data evenly among threads""" 
         k, m = len(a)/n, len(a) % n 
         return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] 
           for i in xrange(n)) 
        def run_worker(
          i, 
          data, 
          queue, 
          research_name_id, 
          proxies, 
          debug, 
          minDelay, 
          maxDelay): 
         """ Function to pull out all publication links from nist 
         data - research ids pulled using a different script 
         queue - add the publication urls to the list 
         research_name_id - dictionary with research id as key and name as value 
         proxies - scraped proxies 
         """ 
         print 'getLinks', i 
         for d in data: 
          print d 
          queue.put(d) 
    
    
    
    
        def fun_worker(i, queue, proxies, debug, minDelay, maxDelay): 
         print 'publicationData', i 
         try: 
          print queue.pop() 
         except: 
          pass 
    
    
    
    
        def main(): 
         print "Initializing workers" 
         pool = multiprocessing.Pool(num_threads, init_worker) 
         distributed_ids = list(split(list(ids), num_threads)) 
         for i in range(num_threads): 
          data_thread = distributed_ids[i] 
          print data_thread 
          pool.apply_async(run_worker, args=(i + 1, 
            data_thread, 
            queue, 
            research_name_id, 
            proxies, 
            debug, 
            minDelay, 
            maxDelay, 
           )) 
    
          pool.apply_async(fun_worker, 
           args=(
            i + 1, 
            queue, 
            proxies, 
            debug, 
            minDelay, 
            maxDelay, 
           )) 
    
         try: 
          print "Waiting 10 seconds" 
          time.sleep(10) 
    
         except KeyboardInterrupt: 
          print "Caught KeyboardInterrupt, terminating workers" 
          pool.terminate() 
          pool.join() 
    
         else: 
          print "Quitting normally" 
          pool.close() 
          pool.join() 
    
        if __name__ == "__main__": 
         main() 
    

私が得る唯一の出力は

 Initializing workers 
     [55, 125, 428, 429, 430, 895] 
     [572, 126, 833, 502, 404] 
     Waiting 10 seconds 
     Quitting normally 

答えて

0

問題のカップルがあります:

  1. あなたはmultiprocessing.Queue
  2. を使用していない、あなたがしたい場合はapply_asyncなどを介してサブプロセスとキューを共有する場合は、マネージャ012を使用する必要があります。

ただし、元に戻って自分が行っていることを尋ねる必要があります。 apply_asyncは本当に行く方法ですか?反復的にマップしたい項目のリストがあり、計算を集中的に実行する長時間実行される変換を適用します(I/Oをブロックするだけの場合は、スレッドを使用することもできます)。 imap_unorderedはあなたが望むものを実際にあるように私には思える:

pool = multiprocessing.Pool(num_threads, init_worker) 
links = pool.imap_unordered(run_worker1, ids) 
output = pool.imap_unordered(fun_worker1, links) 

run_worker1fun_worker1は、単一の引数を取るように変更する必要があります。他のデータを共有する必要がある場合は、それをサブプロセスに繰り返し渡すのではなく、イニシャライザに渡す必要があります。

+0

ご意見ありがとうございます。私はまた、複数のプロセスを開始したい。それを行う正しい方法はapply_asyncですか?私はimpa_unorderedについてもっと読む – nEO

関連する問題