2016-04-04 6 views
5

エラー\例外がある場合はすべてのプロセスを\ killする必要があります。私はpsutilを使用してすべてのプロセスを殺すためにStackOwerflowソリューションで見つけましたが、時々私は問題を抱えています - psutilが子プロセスとメインプロセスを終了すると、新しいプロセスが開始し、多重処理で一度に生成されたすべてのプロセスを強制停止します。プール

import psutil 

class MyClass: 
    parent_pid = 0 
    ids_list = range(300) 

    def main(self): 
     self.parent_pid = os.getpid() 
     pool = multiprocessing.Pool(3) 

     for osm_id in self.ids_list: 
      pool.apply_async(self.handle_country_or_region, 
          kwds=dict(country_id=osm_id), 
          error_callback=self.kill_proc_tree) 

     pool.close() 
     pool.join() 

    def kill_proc_tree(self, including_parent=True): 
     parent = psutil.Process(self.parent_pid) 
     children = parent.children(recursive=True) 

     for child in children: 
      child.kill() 
     psutil.wait_procs(children, timeout=5) 

     if including_parent: 
      parent.kill() 
      parent.wait(5) 

    def handle_country_or_region(self, country_id=None, queue=None): 
     pass 
     # here I do some task 

(すなわちずに「私がプールを終了するのではなくプロセスをkillする必要があるようですが、この場合には、私は

pool.close() 
pool.terminate() 
pool.join() 

をすれば、私の端末が何をやって停止し、新しいラインが完全に空であります>>> ")、何も起こりません。

理想的には、次の流れが必要です。エラー\例外がある場合は、すべてのコード実行を停止し、ターミナルの対話型プロンプトに戻ります。

誰でも私を正常に動作させるのに役立つことができますか? 私はPython 3.5とUbuntu 15.10を使用しています

答えて

0

解決策は非常に簡単です。 'main'の中に 'killer'関数を入れてください。誰でも以下、queueを使用する必要がある場合はゾンビプロセスを持つ回避正しい方法でqueueを終了する方法を示しており、コードの拡張変形である

class MyClass: 
    ids_list = range(300) 

    def main(self): 
     pool = multiprocessing.Pool(3) 

     def kill_pool(err_msg): 
      print(err_msg) 
      pool.terminate() 

     for osm_id in self.ids_list: 
      pool.apply_async(self.handle_country_or_region,  
          kwds=dict(country_id=osm_id), 
          error_callback=kill_pool) 

     pool.close() 
     pool.join() 

    def handle_country_or_region(self, country_id=None, queue=None): 
     pass # here I do some task 

完全なコードは次のようになります

import pickle 
import os 
import multiprocessing 

class MyClass: 
    ids_list = range(300) 
    folder = os.path.join(os.getcwd(), 'app_geo') 
    STOP_TOKEN = 'stop queue' 

    def main(self): 

     # >>> Queue part shared between processes <<< 
     manager = multiprocessing.Manager() 
     remove_id_queue = manager.Queue() 

     remove_id_process = multiprocessing.Process(target=self.remove_id_from_file, 
                args=(remove_id_queue,)) 
     remove_id_process.start() 
     # >>> End of queue part <<< 

     pool = multiprocessing.Pool(3) 

     def kill_pool(err_msg): 
      print(err_msg) 
      pool.terminate() 

     for osm_id in self.ids_list: 
      pool.apply_async(self.handle_country_or_region,  
          kwds=dict(country_id=osm_id), 
          error_callback=kill_pool) 

     pool.close() 
     pool.join() 

     # >>> Anti-zombie processes queue part <<< 
     remove_id_queue.put(self.STOP_TOKEN) 
     remove_id_process.join() 
     manager.shutdown() 
     # >>> End 

    def handle_country_or_region(self, country_id=None, queue=None): 
     # here I do some task 
     queue.put(country_id) 

    def remove_id_from_file(self, some_queue): 
     while True: 
      osm_id = some_queue.get() 
      if osm_id == self.STOP_TOKEN: 
       return 
      self.ids_list.remove(osm_id) 
      with open(self.folder + '/ids_list.pickle', 'wb') as f: 
       pickle.dump(self.ids_list, f) 
+0

あまりにも悪い私はこれをPython 2で行うことができません: –

+0

@Adi、申し訳ありません、私はPython 2で動作しませんでしたので、助けにはなりません。 'Python 2.7.12'で' '私が使ったすべてのメソッド - https://docs.python.org/2/library/ multiprocessing.html – TitanFighter

+0

python2にerror_callbackは存在しません。 –

関連する問題