0

私は子プロセスを作成し、Futureを使用して結果を受け取って、必要なときにそれらの一部を強制終了するという要件があります。マルチプロセスとプロセスをconcurrent.future._base.Futureで統合する

私はこれをサブクラス化し、multiprocessing.Processクラスを作成し、start()メソッドからFutureオブジェクトを返します。

問題は、決して呼び出されないので、私はcb()関数で結果を受け取ることができないということです。

私の現在の実装ではこれが何か他の方法で行なわれているのであれば、助けてください。続き

は、あなたがあなたが返す新しい未来を作成し、あなたのstartメソッドで

from multiprocessing import Process, Queue 
from concurrent.futures import _base 
import threading 
from time import sleep 


def foo(x,q): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    q.put(result) 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = _base.Future() 

    def run(self): 
     q = Queue() 
     worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,))) 
     worker_thread.start() 
     r = q.get(block=True) 
     print('setting result {}'.format(r)) 
     self.f.set_result(result=r) 
     print('done setting result') 

    def start(self): 
     f = _base.Future() 
     run_thread = threading.Thread(target=self.run) 
     run_thread.start() 
     return f 


def cb(future): 
    print('received result in callback {}'.format(future)) 


def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    f = p1.start() 
    f.add_done_callback(fn=cb) 

    sleep(10) 


if __name__ == '__main__': 

    main() 

    print('Main thread dying') 

答えて

1

私の現在のアプローチです。これは、あなたが結果を設定したものとは異なる将来であり、この未来はまったく使われていません。試してください:

def start(self): 
    run_thread = threading.Thread(target=self.run) 
    run_thread.start() 
    return self.f 

ただし、コードにはさらに問題があります。プロセスのstartメソッドをオーバーライドし、ワーカースレッドの実行と置き換えて、実際にはマルチプロセッシングをバイパスします。また、_baseモジュールをインポートしないでください。これは先頭のアンダースコアから見た実装の詳細です。 concurrent.futures.Futureをインポートする必要があります(これは同じクラスですが、公開APIを使用しています)。

は、これは本当にマルチプロセッシング使用しています:

from multiprocessing import Process, Queue 
from concurrent.futures import Future 
import threading 
from time import sleep 


def foo(x,q): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    q.put(result) 

class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 

    def run(self): 
     q = Queue() 
     worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,))) 
     worker_thread.start() 
     r = q.get(block=True) 
     print('setting result {}'.format(r)) 
     self.f.set_result(result=r) 
     print('done setting result') 

def cb(future): 
    print('received result in callback {}: {}'.format(future, future.result())) 

def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    p1.f.add_done_callback(fn=cb) 
    p1.start() 
    p1.join() 
    sleep(10) 

if __name__ == '__main__': 
    main() 
    print('Main thread dying') 

そして、あなたが本当に必要はありません、あなたのターゲット機能を実行するワーカースレッドを産卵、新しいプロセスでは、すでにしている、あなたは自分の目的の機能を実行することができます代わりに直接。ターゲット関数があなたがそれについて知らないExceptionを発生させた場合、あなたのコールバックは成功すると呼び出されます。あなたがいることを修正した場合、その後、あなたが残っているので:

from multiprocessing import Process 
from concurrent.futures import Future 
import threading 
from time import sleep 


def foo(x): 
    print('result {}'.format(x*x)) 
    result = x*x 
    sleep(5) 
    return result 

class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 

    def run(self): 
     try: 
      r = self.target(*self.args) 
      print('setting result {}'.format(r)) 
      self.f.set_result(result=r) 
      print('done setting result') 
     except Exception as ex: 
      self.f.set_exception(ex) 

def cb(future): 
    print('received result in callback {}: {}'.format(future, future.result())) 

def main(): 
    p1 = MyProcess(target=foo, args=(2,)) 
    p1.f.add_done_callback(fn=cb) 
    p1.start() 
    p1.join() 
    sleep(10) 

if __name__ == '__main__': 
    main() 
    print('Main thread dying') 

これはProcessPoolExecutorが何をするか、基本的です。

+0

返信いただきありがとうございます。これは本当にばかげた間違いでした。気づいてくれてありがとう! :) –

関連する問題