2017-06-30 12 views
0

私は、Actorモデルを使用してバックグラウンドプロセスを管理するための単純なオブジェクトセットを持っています。この場合、私は1人の俳優にしか関心がありません。ただし、アクターがメッセージを受信する間は永続的な状態を維持することが重要です。PythonでカスタムFuturesオブジェクトを作成する

オブジェクトは、メインスレッドのキューにメッセージを追加することで機能します。それから、メインスレッドは気に入ったように実行できます。しばらくして、結果キューに新しいものがあるかどうかを確認します。これが起こると、俳優が仕事を完了したことが分かります。

これがFuturesオブジェクトを使用してよりクリーンな方法で実装されているかどうかを知りたいと思います。現在の実装は次のとおりです。

import multiprocessing 
import time 
import collections 


class Client(object): 
    """ 
    Object used in the main thread to communicate with background actors 
    """ 
    def __init__(client): 
     client.manager = None 
     client.start() 

    def __del__(client): 
     if client.manager and client.manager.is_alive(): 
      client.get(StopIteration) 

    def start(client): 
     client.task_queue = multiprocessing.JoinableQueue() 
     client.result_queue = multiprocessing.Queue() 
     client.result_history = collections.deque(maxlen=1000) 
     client.manager = Manager(client.task_queue, client.result_queue) 
     client.manager.start() 

    def post(client, payload): 
     client.task_queue.put(payload) 

    def get(client, payload): 
     # Exhaust any existing results 
     list(client.results()) 
     # Post the command 
     client.post(payload) 
     # Wait for a response 
     result = client.wait_for_result() 
     return result 

    def wait_for_result(client): 
     wait = 0 
     while True: 
      for result in client.results(): 
       return result 
      time.sleep(wait) 
      wait = max(1, wait + .01) 

    def results(client): 
     """ Look at results put on the result_queue """ 
     while not client.result_queue.empty(): 
      item = client.result_queue.get() 
      client.result_history.append(item) 
      yield item 


class Manager(multiprocessing.Process): 
    """ 
    Manager manages a single actor. 
    A manager sends messages an actor and appends a response when it is done. 
    """ 
    def __init__(self, task_queue, result_queue): 
     super(Manager, self).__init__() 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     """ main loop """ 
     terminate = False 
     # Create Actor in separate process and send messages to it 
     actor = Actor() 

     while not terminate: 
      message = self.task_queue.get() 
      print('Sending message={} to actor'.format(message)) 
      try: 
       if message is StopIteration: 
        content = 'shutdown' 
        terminate = True 
       else: 
        content = actor.handle(message) 
      except Exception as ex: 
       print('Error handling message') 
       status = 'error' 
       content = repr(ex) 
      else: 
       status = 'success' 
       print('Actor finished handling message={}'.format(message)) 

      # Send back result 
      response = { 
       'status': status, 
       'content': content 
      } 
      self.task_queue.task_done() 
      self.result_queue.put(response) 
     print('Manager is shutting down') 


class Actor(object): 
    """ 
    An actor is given messages from its manager and performs actions in a 
    single thread. Its state is private and threadsafe. 
    """ 
    def __init__(actor): 
     actor.state = {} 

    def handle(actor, message): 
     if not isinstance(message, dict): 
      raise ValueError('Commands must be passed in a message dict') 
     message = message.copy() 
     action = message.pop('action', None) 
     if action is None: 
      raise ValueError('message must have an action item') 
     if action == 'hello world': 
      content = 'hello world' 
      return content 
     elif action == 'debug': 
      return actor 
     elif action == 'start': 
      actor.state['a'] = 3 
      return 'started' 
     elif action == 'add': 
      for i in range(10000000): 
       actor.state['a'] += 1 
      return 'added', actor.state['a'] 
     else: 
      raise ValueError('Unknown action=%r' % (action,)) 


def test(): 
    print('Starting Test') 
    client = Client() 
    print('About to send messages') 
    # Get sends a message and then blocks until the response is returned. 
    print(client.get({'action': 'hello world'})) 
    print(client.get({'action': 'start'})) 
    print(client.get({'action': 'add'})) 
    print('Test completed') 


if __name__ == '__main__': 
    test() 

このコードを修正してFutureオブジェクトを使用したいと思います。クライアントがメッセージを送信しようとするたびに、Futureオブジェクトを作成し、それをマルチプロセッシング・キューに送ることは可能ですか?その後、マネージャはactors関数を実行し、result_queueに結果を追加する代わりにFutureオブジェクトの状態を変更することができます。

これは、結果をアクターに送信されたメッセージと結びつけるクリーンな方法を提供するようです。また、私が最初の例で持っているgetメソッドとresultsメソッドの必要性もなくなります。

直感的に、私はそれがこのような何かを見てみたい:

from concurrent import futures 
import multiprocessing 


class Client(object): 
    """ 
    Object used in the main thread to communicate with background actors 
    """ 
    def __init__(client): 
     client.manager = None 
     client.start() 

    def __del__(client): 
     if client.manager and client.manager.is_alive(): 
      f = client.post(StopIteration) 

    def start(client): 
     client.task_queue = multiprocessing.JoinableQueue() 
     client.manager = Manager(client.task_queue) 
     client.manager.start() 

    def post(client, payload): 
     f = futures.Future() 
     client.task_queue.put((f, payload)) 
     return f 


class Manager(multiprocessing.Process): 
    """ 
    Manager manages a single actor. 
    """ 
    def __init__(self, task_queue): 
     super(Manager, self).__init__() 
     self.task_queue = task_queue 

    def run(self): 
     """ main loop """ 
     terminate = False 
     # Create Actor in separate process and send messages to it 
     actor = Actor() 

     while not terminate: 
      f, message = self.task_queue.get() 
      f.set_running_or_notify_cancel() 
      print('Sending message={} to actor'.format(message)) 
      try: 
       if message is StopIteration: 
        content = 'shutdown' 
        terminate = True 
       else: 
        content = actor.handle(message) 
      except Exception as ex: 
       print('Error handling message') 
       status = 'error' 
       content = repr(ex) 
      else: 
       status = 'success' 
       print('Actor finished handling message={}'.format(message)) 

      # Send back result 
      response = { 
       'status': status, 
       'content': content 
      } 
      self.task_queue.task_done() 
      f.set_result(response) 
     print('Manager is shutting down') 


class Actor(object): 
    """ 
    An actor is given messages from its manager and performs actions in a 
    single thread. Its state is private and threadsafe. 
    """ 
    def __init__(actor): 
     actor.state = {} 

    def handle(actor, message): 
     if not isinstance(message, dict): 
      raise ValueError('Commands must be passed in a message dict') 
     message = message.copy() 
     action = message.pop('action', None) 
     if action is None: 
      raise ValueError('message must have an action item') 
     if action == 'hello world': 
      content = 'hello world' 
      return content 
     elif action == 'debug': 
      return actor 
     elif action == 'start': 
      actor.state['a'] = 3 
      return 'started' 
     elif action == 'add': 
      for i in range(10000000): 
       actor.state['a'] += 1 
      return 'added', actor.state['a'] 
     else: 
      raise ValueError('Unknown action=%r' % (action,)) 


def test(): 
    print('Starting Test') 
    client = Client() 
    print('About to send messages') 
    f1 = client.post({'action': 'hello world'}) 
    print(f1.result()) 
    f2 = client.post({'action': 'start'}) 
    print(f2.result()) 
    f3 = client.post({'action': 'add'}) 
    print(f3.result()) 
    print('Test completed') 

if __name__ == '__main__': 
    test() 

しかし、これは明らかに正しく実行されません。私は未来を作るために何らかのプロセスプールマネージャーが必要だと信じています(プールマネージャーだけが呼び出すべきだと書かれているメソッドを呼び出すためです)。しかし、私はそれをどうやってやるかについてはあまりよく分かりません。シングルトンのワーカー関数をマップするために先物を使ったことがありますが、以前は状態を使って外部プロセスを管理したことはありません。

誰かがこれで私を助けることができますか? Futuresでこれを実装する方が簡単な方法がありますか?

答えて

関連する問題