0

私はクラインとデファーを探しています。次の例では、子プロセスを使用して数値をインクリメントし、Future経由で返すようにしています。私は未来のコールバックを受け取ることができます。遅れているクラインのアプリ

遅延オブジェクトはcb()関数を呼び出すことはなく、エンドポイントへの要求は返されないという問題があります。問題を特定するのを手伝ってください。続き

は、プロセスP1を開始する前にコールバックを追加Process4.pyコードです

from multiprocessing import Process 
from concurrent.futures import Future 
from time import sleep 
from twisted.internet.defer import Deferred 

def foo(x): 
    result = x+1 
    sleep(3) 
    return result 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.f = Future() 
     self.visit = 0 

    def run(self): 
     r = foo(self.visit) 
     self.f.set_result(result=r) 

def cb(result): 
    print('visitor number {}'.format(result)) 
    return result 

def eb(err): 
    print('error occurred {}'.format(err)) 
    return err 


def future_to_deferred(future): 
    d = Deferred() 

    def callback(f): 
     e = f.exception() 
     if e: 
      d.errback(e) 
     else: 
      d.callback(f.result()) 

    future.add_done_callback(callback) 
    return d 

def get_visitor_num(): 
    p1 = MyProcess(target=foo, args=None) 
    d = future_to_deferred(p1.f) 
    p1.start() 
    d.addCallback(cb) 
    d.addErrback(eb) 
    sleep(1) 
    return d 

編集1

後、私のserver.pyコード

from klein import Klein 
from twisted.internet.defer import inlineCallbacks, returnValue 
import Process4 

if __name__ == '__main__': 
    app = Klein() 

    @app.route('/visit') 
    @inlineCallbacks 
    def get_num_visit(request):   
     try: 
      resp = yield Process4.get_visitor_num() 
      req.setResponseCode(200) 
      returnValue('Visited = {}'.format(resp)) 
     except Exception as e: 
      req.setResponseCode(500) 
      returnValue('error {}'.format(e)) 

    print('starting server') 
    app.run('0.0.0.0', 5005) 

での問題を解決cb()関数を呼び出します。しかし、依然としてエンドポイントに対して行われたhttp要求は返されません。

+0

後ツイストとSTDLIB・マルチプロセッシングモジュールが悪い適合しています。代わりにアンプルを検討してください。 https://stackoverflow.com/questions/5715217/mix-python-twisted-with-multiprocessingおよびhttps://stackoverflow.com/questions/1470850/twisted-network-client-with-multiprocessing-workersおよび他の同様の質問を参照してください。 SO。 –

+0

'reactor.callFromThread'を呼び出して、結果がメインスレッドにセットされるようにしなければなりません。 [私がしばらく前に与えたこの回答](https://stackoverflow.com/questions/45930518/how-to-make-twisted-defer-get-function-result/45969032#45969032)を見て、それが理にかなっている。似たようなものを適用できるはずです。 –

+0

お返事ありがとうございます。下の私の答えを見てください。 @ notorious.no、Jean-Paul Calderone –

答えて

0

それは)(実行で将来結果self.f.set_result(結果= R)設定方法は、どのスレッドがなく子プロセス、内コールバック()メソッドをトリガすることが判明結果が返されるのを待つ!

MainProcessでトリガされたcallback()関数を取得するには、MainProcess内のワーカースレッドを使用してマルチプロセスキューを使用して子プロセスから結果を取得し、将来の結果を設定する必要がありました。

@ notorious.no返信いただきありがとうございます。私が気付いたのは、reactor.callFromThreadは、変更されたコードで作業スレッドからMainThreadへの切り替えを行いますが、d.callback(f.result())はうまく動作しますがワーカースレッドの結果を返します。

修正作業のコードである

server.py

from klein import Klein 
from twisted.internet.defer import inlineCallbacks, returnValue 


import Process4 

if __name__ == '__main__': 
    app = Klein() 
    visit_count = 0 

    @app.route('/visit') 
    @inlineCallbacks 
    def get_num_visit(req): 
     global visit_count 
     try: 
      resp = yield Process4.get_visitor_num(visit_count) 
      req.setResponseCode(200) 
      visit_count = resp 
      returnValue('Visited = {}'.format(resp)) 
     except Exception as e: 
      req.setResponseCode(500) 
      returnValue('error {}'.format(e)) 

    print('starting server') 
    app.run('0.0.0.0', 5005) 

Process4.py

from multiprocessing import Process, Queue 
from concurrent.futures import Future 
from time import sleep 
from twisted.internet.defer import Deferred 
import threading 
from twisted.internet import reactor 


def foo(x, q): 
    result = x+1 
    sleep(3) 
    print('setting result, {}'.format(result)) 
    q.put(result) 


class MyProcess(Process): 

    def __init__(self, target, args): 
     super().__init__() 
     self.target = target 
     self.args = args 
     self.visit = 0 

    def run(self): 
     self.target(*self.args) 


def future_to_deferred(future): 
    d = Deferred() 

    def callback(f): 
     e = f.exception() 
     print('inside callback {}'.format(threading.current_thread().name)) 
     if e: 
      print('calling errback') 
      d.errback(e) 
      # reactor.callFromThread(d.errback, e) 
     else: 
      print('calling callback with result {}'.format(f.result())) 
      # d.callback(f.result()) 
      reactor.callFromThread(d.callback, f.result()) 
    future.add_done_callback(callback) 
    return d 


def wait(q,f): 
    r = q.get(block=True) 
    f.set_result(r) 


def get_visitor_num(x): 

    def cb(result): 
     print('inside cb visitor number {} {}'.format(result, threading.current_thread().name)) 
     return result 

    def eb(err): 
     print('inside eb error occurred {}'.format(err)) 
     return err 

    f = Future() 
    q = Queue() 
    p1 = MyProcess(target=foo, args=(x,q,)) 

    wait_thread = threading.Thread(target=wait, args=(q,f,)) 
    wait_thread.start() 

    defr = future_to_deferred(f) 
    defr.addCallback(cb) 
    defr.addErrback(eb) 
    p1.start() 
    print('returning deferred') 
    return defr 
関連する問題