2017-08-27 15 views
1

whileループで実行するいくつかのプロセスがあります。私は基本的にいくつかのプロセスでデータを収集しています。停止する前に、データをCSVファイルまたはJSONファイルに保存する必要があります。私が今持っているのは、super関数を使ってmultiprocessing.Processクラスのjoinメソッドをオーバーライドすることです。process.joinを呼び出すときにスクリプトを実行するにはどうすればよいですか?

class Processor(multiprocessing.Process): 
    def __init__(self, arguments): 
     multiprocessing.Process.__init__(self) 

    def run(self): 
     self.main_function() 

    def main_function(self): 
     While True: 
      #do things to incoming data 

    def function_on_join(self): 
     #do one last thing before the process ends 

    def join(self, timeout=None): 
     self.function_on_join() 
     super(Processor, self).join(timeout=timeout) 

これを行うには良い方法/正しい方法/もっとニシキヘビ方法はありますか?

答えて

1

concurrent.futuresモジュールをご覧ください。

あなたの仕事をワーカーのプールで実行するタスクのリストとして記述できる場合は、

タスクベースのあなたがjobs(ファイル名の例リスト)のシーケンスを持っていて、それらを並行して処理したいとき

マルチプロセッシング - 次のようにあなたがそうすることができます。

from concurrent.futures import ProcessPoolExecutor  
import requests 

def get_url(url): 
    resp = requests.get(url) 
    print(f'{url} - {resp.status_code}') 
    return url 

jobs = ['http://google.com', 'http://python.org', 'http://facebook.com'] 

# create process pool of 3 workers 
with ProcessPoolExecutor(max_workers=1) as pool: 
    # run in parallel each job and gather the returned values 
    return_values = list(pool.map(get_url, jobs)) 

print(return_values) 

出力:

http://google.com - 200 
http://python.org - 200 
http://facebook.com - 200 
['http://google.com', 'http://python.org', 'http://facebook.com'] 

ないタスクベースのマルチプロセッシング

最初のケースのようにジョブを消費しない複数のサブプロセスを実行したい場合は、multiprocessing.Processを使用します。

threading.Threadと同様に、OOPファッションと同様に手順的に使用することができます。

import os 
from multiprocessing import Process 

def func(): 
    print(f'hello from: {os.getpid()}') 

processes = [Process(target=func) for _ in range(4)] # creates 4 processes 

for process in processes: 
    process.daemon = True # close the subprocess if the main program closes 
    process.start() # start the process 

出力:手続きファッション(私見もっとニシキヘビ)のための

例あなたはProcess.join()を使用して待ちたい場合

を終了するプロセスを待っている

hello from: 31821 
hello from: 31822 
hello from: 31823 
hello from: 31824 

(詳細info on process.join() & process.daemon on this SO answer)あなたはこのようにそれを行うことができます。

import os 
import time 
from multiprocessing import Process 

def func(): 
    time.sleep(3) 
    print(f'hello from: {os.getpid()}') 

processes = [Process(target=func) for _ in range(4)] # creates 4 processes 

for process in processes: 
    process.start() # start the process 

for process in processes: 
    process.join() # wait for the process to finish 

print('all processes are done!') 

この出力:

hello from: 31980 
hello from: 31983 
hello from: 31981 
hello from: 31982 
all processes are done! 

関連する問題