私は、Actorモデルを使用してバックグラウンドプロセスを管理するための単純なオブジェクトセットを持っています。この場合、私は1人の俳優にしか関心がありません。ただし、アクターがメッセージを受信する間は永続的な状態を維持することが重要です。PythonでカスタムFuturesオブジェクトを作成する
オブジェクトは、メインスレッドのキューにメッセージを追加することで機能します。それから、メインスレッドは気に入ったように実行できます。しばらくして、結果キューに新しいものがあるかどうかを確認します。これが起こると、俳優が仕事を完了したことが分かります。
これがFuturesオブジェクトを使用してよりクリーンな方法で実装されているかどうかを知りたいと思います。現在の実装は次のとおりです。
import multiprocessing
import time
import collections
class Client(object):
"""
Object used in the main thread to communicate with background actors
"""
def __init__(client):
client.manager = None
client.start()
def __del__(client):
if client.manager and client.manager.is_alive():
client.get(StopIteration)
def start(client):
client.task_queue = multiprocessing.JoinableQueue()
client.result_queue = multiprocessing.Queue()
client.result_history = collections.deque(maxlen=1000)
client.manager = Manager(client.task_queue, client.result_queue)
client.manager.start()
def post(client, payload):
client.task_queue.put(payload)
def get(client, payload):
# Exhaust any existing results
list(client.results())
# Post the command
client.post(payload)
# Wait for a response
result = client.wait_for_result()
return result
def wait_for_result(client):
wait = 0
while True:
for result in client.results():
return result
time.sleep(wait)
wait = max(1, wait + .01)
def results(client):
""" Look at results put on the result_queue """
while not client.result_queue.empty():
item = client.result_queue.get()
client.result_history.append(item)
yield item
class Manager(multiprocessing.Process):
"""
Manager manages a single actor.
A manager sends messages an actor and appends a response when it is done.
"""
def __init__(self, task_queue, result_queue):
super(Manager, self).__init__()
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
""" main loop """
terminate = False
# Create Actor in separate process and send messages to it
actor = Actor()
while not terminate:
message = self.task_queue.get()
print('Sending message={} to actor'.format(message))
try:
if message is StopIteration:
content = 'shutdown'
terminate = True
else:
content = actor.handle(message)
except Exception as ex:
print('Error handling message')
status = 'error'
content = repr(ex)
else:
status = 'success'
print('Actor finished handling message={}'.format(message))
# Send back result
response = {
'status': status,
'content': content
}
self.task_queue.task_done()
self.result_queue.put(response)
print('Manager is shutting down')
class Actor(object):
"""
An actor is given messages from its manager and performs actions in a
single thread. Its state is private and threadsafe.
"""
def __init__(actor):
actor.state = {}
def handle(actor, message):
if not isinstance(message, dict):
raise ValueError('Commands must be passed in a message dict')
message = message.copy()
action = message.pop('action', None)
if action is None:
raise ValueError('message must have an action item')
if action == 'hello world':
content = 'hello world'
return content
elif action == 'debug':
return actor
elif action == 'start':
actor.state['a'] = 3
return 'started'
elif action == 'add':
for i in range(10000000):
actor.state['a'] += 1
return 'added', actor.state['a']
else:
raise ValueError('Unknown action=%r' % (action,))
def test():
print('Starting Test')
client = Client()
print('About to send messages')
# Get sends a message and then blocks until the response is returned.
print(client.get({'action': 'hello world'}))
print(client.get({'action': 'start'}))
print(client.get({'action': 'add'}))
print('Test completed')
if __name__ == '__main__':
test()
このコードを修正してFutureオブジェクトを使用したいと思います。クライアントがメッセージを送信しようとするたびに、Futureオブジェクトを作成し、それをマルチプロセッシング・キューに送ることは可能ですか?その後、マネージャはactors関数を実行し、result_queueに結果を追加する代わりにFutureオブジェクトの状態を変更することができます。
これは、結果をアクターに送信されたメッセージと結びつけるクリーンな方法を提供するようです。また、私が最初の例で持っているgetメソッドとresultsメソッドの必要性もなくなります。
直感的に、私はそれがこのような何かを見てみたい:
from concurrent import futures
import multiprocessing
class Client(object):
"""
Object used in the main thread to communicate with background actors
"""
def __init__(client):
client.manager = None
client.start()
def __del__(client):
if client.manager and client.manager.is_alive():
f = client.post(StopIteration)
def start(client):
client.task_queue = multiprocessing.JoinableQueue()
client.manager = Manager(client.task_queue)
client.manager.start()
def post(client, payload):
f = futures.Future()
client.task_queue.put((f, payload))
return f
class Manager(multiprocessing.Process):
"""
Manager manages a single actor.
"""
def __init__(self, task_queue):
super(Manager, self).__init__()
self.task_queue = task_queue
def run(self):
""" main loop """
terminate = False
# Create Actor in separate process and send messages to it
actor = Actor()
while not terminate:
f, message = self.task_queue.get()
f.set_running_or_notify_cancel()
print('Sending message={} to actor'.format(message))
try:
if message is StopIteration:
content = 'shutdown'
terminate = True
else:
content = actor.handle(message)
except Exception as ex:
print('Error handling message')
status = 'error'
content = repr(ex)
else:
status = 'success'
print('Actor finished handling message={}'.format(message))
# Send back result
response = {
'status': status,
'content': content
}
self.task_queue.task_done()
f.set_result(response)
print('Manager is shutting down')
class Actor(object):
"""
An actor is given messages from its manager and performs actions in a
single thread. Its state is private and threadsafe.
"""
def __init__(actor):
actor.state = {}
def handle(actor, message):
if not isinstance(message, dict):
raise ValueError('Commands must be passed in a message dict')
message = message.copy()
action = message.pop('action', None)
if action is None:
raise ValueError('message must have an action item')
if action == 'hello world':
content = 'hello world'
return content
elif action == 'debug':
return actor
elif action == 'start':
actor.state['a'] = 3
return 'started'
elif action == 'add':
for i in range(10000000):
actor.state['a'] += 1
return 'added', actor.state['a']
else:
raise ValueError('Unknown action=%r' % (action,))
def test():
print('Starting Test')
client = Client()
print('About to send messages')
f1 = client.post({'action': 'hello world'})
print(f1.result())
f2 = client.post({'action': 'start'})
print(f2.result())
f3 = client.post({'action': 'add'})
print(f3.result())
print('Test completed')
if __name__ == '__main__':
test()
しかし、これは明らかに正しく実行されません。私は未来を作るために何らかのプロセスプールマネージャーが必要だと信じています(プールマネージャーだけが呼び出すべきだと書かれているメソッドを呼び出すためです)。しかし、私はそれをどうやってやるかについてはあまりよく分かりません。シングルトンのワーカー関数をマップするために先物を使ったことがありますが、以前は状態を使って外部プロセスを管理したことはありません。
誰かがこれで私を助けることができますか? Futuresでこれを実装する方が簡単な方法がありますか?