2017-11-30 22 views
-1

マルチプロセッシングモジュールではなく、python3でパススルーで非デーモンプロセスを実装するにはどうしたらいいですか?Python pathosプロセスプール非デーモンですか?

具体的には、私はを参照しています:この投稿への答えはマルチプロセッシングモジュールを介して非鬼神のプロセスを実装し Python Process Pool non-daemonic?

。残念ながら、このモジュールは、他のオブジェクト間ラムダ機能を酸洗いすることはできませんが、哀愁は、Python 2で行います

#import multiprocessing 
#import multiprocessing.pool 
import pathos 

#class NoDaemonProcess(multiprocessing.Process): 
class NoDaemonProcess(pathos.multiprocessing.Pool.Process): 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

#class NoDaemonPool(multiprocessing.pool.Pool): 
class NoDaemonPool(pathos.multiprocessing.Pool): 
    Process = NoDaemonProcess 

def myproc(args): 
    i, max_workers = args 
    #pool = multiprocessing.Pool(max_workers) 
    pool = pathos.pools.ProcessPool(max_workers) 
    l_args = [j for j in range(i)] 
    mysubproc = lambda x : x 
    print("myproc", l_args, pool.map(mysubproc, l_args)) 
    return i 

max_workers = [2, 1] 
executor = NoDaemonPool(max_workers[0]) 
#executor = pathos.multiprocessing.Pool(max_workers[0]) 
l_args = [(i, max_workers[1]) for i in range(10)] 
print(executor.map(myproc, l_args)) 

出力:

('myproc', [], []) 
('myproc', [0, 1], [0, 1]) 
('myproc', [0], [0]) 
('myproc', [0, 1, 2], [0, 1, 2]) 
('myproc', [0, 1, 2, 3], [0, 1, 2, 3]) 
('myproc', [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5]) 
('myproc', [0, 1, 2, 3, 4], [0, 1, 2, 3, 4]) 
('myproc', [0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 6]) 
('myproc', [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7]) 
('myproc', [0, 1, 2, 3, 4, 5, 6, 7, 8], [0, 1, 2, 3, 4, 5, 6, 7, 8]) 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 

はPython 3では、哀愁モジュールが変更されたw.r.t. Python 2、例えば pathos.multiprocessing.Pool.Process はもはやクラスではなく、関数であるため、継承のためにこれを使用することはできません(上記を参照)。 - 私が紛失している文書はありますか?

上記のコードをPython 3のpathosで動作させるにはどうすればよいですか?

import multiprocessing 
import multiprocessing.pool 
import pathos 

class NoDaemonProcess(multiprocessing.Process): 
#class NoDaemonProcess(pathos.multiprocessing.Pool.Process): 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

class NoDaemonPool(multiprocessing.pool.Pool): 
#class NoDaemonPool(pathos.multiprocessing.Pool): 
    Process = NoDaemonProcess 

def myproc(args): 
    i, max_workers = args 
    #pool = multiprocessing.Pool(max_workers) 
    pool = pathos.pools.ProcessPool(max_workers) 
    l_args = [j for j in range(i)] 
    mysubproc = lambda x : x 
    print("myproc", l_args, pool.map(mysubproc, l_args)) 
    return i 

max_workers = [2, 1] 
executor = NoDaemonPool(max_workers[0]) 
#executor = pathos.multiprocessing.Pool(max_workers[0]) 
l_args = [(i, max_workers[1]) for i in range(10)] 
print(executor.map(myproc, l_args)) 

しかし、この作業は、周りにある:上記の特定の例のための回避策は、一方が単にマルチプロセッシングNoDaemonPool実装にフォールバック、およびデーモンサブプロセスに哀愁を使用することができるよう

ない解決ので (ⅰ)その輸入哀愁とマルチプロセッシング、そしてさらに重要な (ⅱ)MYPROCが代わりに

myproc = lambda x : x 

として定義されている場合には、例えば、酸洗いすることはできませんありがとうござい版の両方yはずっと、 ベスト、 セバスチャン

+0

は、より具体的に、人々はあなたが質問をする前に、問題を解決するためにやってみました内容を知っているようにしてください。 –

答えて

0

私はちょうど

Python Process Pool non-daemonic?

からオリジナルのアイデアを取り、

How to run nested, hierarchical pathos multiprocessing maps?で提案されているように扱うクリーンプロセスのエラーを維持するPythonの3のための答えを自分自身を発見しました

import pathos 
import signal 
import sys 
import os 
import time 

# redefine process pool via inheritance 
import multiprocess.context as context 
class NoDaemonProcess(context.Process): 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

class NoDaemonPool(pathos.multiprocessing.Pool): 
    def Process(self, *args, **kwds): 
     return NoDaemonProcess(*args, **kwds) 

def get_pid_i(x): 
    return os.getpid() 
def hard_kill_pool(pid_is, pool): 
    for pid_i in pid_is: 
     os.kill(pid_i, signal.SIGINT) # sending Ctrl+C 
    pool.terminate() 

def myproc(args): 
    i, max_workers = args 
    l_args = [j for j in range(i)] 
    mysubproc = lambda x : x 
    pool = pathos.pools.ProcessPool(max_workers) 
    pool.restart(force=True) 
    pid_is = pool.map(get_pid_i, range(max_workers)) 
    try: 
     l_traj_df = pool.amap(mysubproc, l_args) 
     counter_i = 0 
     while not l_traj_df.ready(): 
      time.sleep(1) 
      if counter_i % 30 == 0: 
       print('Waiting for children running in pool.amap() in myproc({}) with PIDs: {}'.format(i, pid_is)) 
      counter_i += 1 
     l_traj_df = l_traj_df.get() 
     pool.close() 
     pool.join() 
    except KeyboardInterrupt: 
     print('Ctrl+C received in myproc({}), attempting to terminate pool...').format(myens) 
     hard_kill_pool(pid_is, pool) # sending Ctrl+C 
     raise 
    except: 
     print('Attempting to close parallel after exception: {} in myproc({})'.format(sys.exc_info()[0], myens)) 
     hard_kill_pool(pid_is, pool) # sending Ctrl+C 
     raise 

#myproc = lambda x : x 

max_workers = [2, 1] 
pool = NoDaemonPool(max_workers[0]) 
#pool.restart(force=True) 
pid_is = pool.map(get_pid_i, range(max_workers[0])) 
try: 
    results = pool.map_async(myproc, l_args) 
    counter_i = 0 
    while not results.ready(): 
     if counter_i % 30 == 0: 
      print('Waiting for children running in pool.map_async() in main() with PIDs: {}'.format(pid_is)) 
     time.sleep(2) 
     counter_i += 1 
    results = results.get() 
    pool.close() 
    pool.join() 
except KeyboardInterrupt: 
    print('Ctrl+C received in main(), attempting to terminate pool...') 
    hard_kill_pool(pid_is, pool) # sending Ctrl+C 
    raise 
except: 
    print('Attempting to close parallel after exception: {} in main()'.format(_sys.exc_info()[0])) 
    hard_kill_pool(pid_is, pool) # sending Ctrl+C 
    raise 

出力:

Waiting for children running in pool.map_async() in main() with PIDs [15015, 15014] 
Waiting for children running in pool.amap() in myproc(2) with PIDs [15019] 
Waiting for children running in pool.amap() in myproc(1) with PIDs: [15020] 
Waiting for children running in pool.amap() in myproc(3) with PIDs [15021] 
Waiting for children running in pool.amap() in myproc(4) with PIDs [15022] 
Waiting for children running in pool.amap() in myproc(6) with PIDs [15024] 
Waiting for children running in pool.amap() in myproc(5) with PIDs [15023] 
Waiting for children running in pool.amap() in myproc(7) with PIDs [15025] 
Waiting for children running in pool.amap() in myproc(8) with PIDs [15026] 
Waiting for children running in pool.amap() in myproc(9) with PIDs [15028] 

私は、次のモジュールのバージョンを使用:

python     3.6.0       0 
pathos     0.2.1     py36_1 condo-forge 
multiprocess    0.70.4     py36_0 http://conda.binstar.org/omnia 
dill      0.2.7.1   py36h644ae93_0 
pox      0.2.3     py36_0 conda-forge 
ppft      1.6.4.7.1    py36_0 conda-forge 
six      1.10.0     py36_0