2017-04-25 11 views
1

コールバック関数とハンドラを使用してPythonで奇妙な現象が発生しています。 私はZMQを使用して通信を処理し、ソケットのストリームを使用します。コールバック関数でインスタンス内の正しい値が表示されない

import multiprocessing  
import zmq 
from concurrent.futures import ThreadPoolExecutor 
from zmq.eventloop import ioloop, zmqstream 
from zmq.utils import jsonapi as json 

# Types of messages 
TYPE_A = 'type_a' 
TYPE_B = 'type_b' 


class ZmqProcess(multiprocessing.Process): 
    def __init__(self): 
     super(ZmqProcess, self).__init__() 
     self.context = None 
     self.loop = None 
     self.handle_stream = None 

    def setup(self): 
     self.context = zmq.Context() 
     self.loop = ioloop.IOLoop.instance() 

    def send(self, msg_type, msg, host, port): 
     sock = zmq.Context().socket(zmq.PAIR) 
     sock.connect('tcp://%s:%s' % (host, port)) 
     sock.send_json([msg_type, msg]) 

    def stream(self, sock_type, addr): 
     sock = self.context.socket(sock_type) 
      if isinstance(addr, str): 
      addr = addr.split(':') 
     host, port = addr if len(addr) == 2 else (addr[0], None) 
      if port: 
      sock.bind('tcp://%s:%s' % (host, port)) 
     else: 
      port = sock.bind_to_random_port('tcp://%s' % host) 
     stream = zmqstream.ZMQStream(sock, self.loop)  
     return stream, int(port) 

class MessageHandler(object): 
    def __init__(self, json_load=-1): 
     self._json_load = json_load 
     self.pool = ThreadPoolExecutor(max_workers=10) 

    def __call__(self, msg): 
     i = self._json_load 
     msg_type, data = json.loads(msg[i]) 
     msg[i] = data 
     if msg_type.startswith('_'): 
      raise AttributeError('%s starts with an "_"' % msg_type) 
     getattr(self, msg_type)(*msg) 

そして、私はそれを継承するクラスがあります:私は、基本クラスを持っているまた、私はストレージとして使用するクラスを持っている

import zmq  
import zmq_base  

class ZmqServerMeta(zmq_base.ZmqProcess): 
    def __init__(self, bind_addr, handlers): 
     super(ZmqServerMeta, self).__init__() 
     self.bind_addr = bind_addr 
     self.handlers = handlers 

    def setup(self): 
     super(ZmqServerMeta, self).setup() 
     self.handle_stream, _ = self.stream(zmq.PAIR, self.bind_addr) 
     self.handle_stream.on_recv(StreamHandler(self.handle_stream, self.stop, 
               self.handlers)) 

    def run(self): 
     self.setup() 
     self.loop.start() 

    def stop(self): 
     self.loop.stop() 

class StreamHandler(zmq_base.MessageHandler): 
    def __init__(self, handle_stream, stop, handlers): 
     super(StreamHandler, self).__init__() 
     self._handle_stream = handle_stream 
     self._stop = stop 
     self._handlers = handlers 

    def type_a(self, data): 
     if zmq_base.TYPE_A in self._handlers: 
      if self._handlers[zmq_base.TYPE_A]: 
       for handle in self._handlers[zmq_base.TYPE_A]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def type_b(self, data): 
     if zmq_base.TYPE_B in self._handlers: 
      if self._handlers[zmq_base.TYPE_B]: 
       for handle in self._handlers[zmq_base.TYPE_B]: 
        self.pool.submit(handle, data) 
      else: 
       pass 
     else: 
      pass 

    def endit(self): 
     self._stop() 

を。問題の始まりは次のとおりです。

import threading 
import zmq_server_meta as server 
import zmq_base as base 


class Storage: 
    def __init__(self): 
     self.list = [] 

     self.list_lock = threading.RLock() 

     self.zmq_server = None 
     self.host = '127.0.0.1' 
     self.port = 5432 
     self.bind_addr = (self.host, self.port) 

    def setup(self): 
     handlers = {base.TYPE_A: [self. remove]} 
     self.zmq_server = server.ZmqServerMeta(handlers=handlers, bind_addr=self.bind_addr) 
     self.zmq_server.start() 

    def add(self, data): 
     with self.list_lock: 
      try: 
       self.list.append(data) 
      except: 
       print "Didn't work" 

    def remove(self, msg): 
     with self.list_lock: 
      try: 
       self.list.remove(msg) 
      except: 
       print "Didn't work" 

考えられるのは、そのクラスが受信するグローバル情報がいくつか格納されているということです。 すべてのテストをファイルに開始されます。

import sys 
import time 
import storage 
import zmq_base as base 
import zmq_server_meta as server 



def printMsg(msg): 
    print msg 

store = storage.Storage() 

store.setup() 
handlers = {base.TYPE_B: [printMsg]} 
client = server.ZmqServerMeta(handlers=handlers, bind_addr=('127.0.0.1', 5431)) 
client.start() 

message = "Test" 

store.add(message) 
client.send(base.TYPE_A, message, '127.0.0.1', 5432) 

を私は混乱を減らすために、それを簡素化。単に追加するのではなく、通常は送信してから応答が返されます。レスポンス(クライアントの送信)は正しいコールバックremove()によって処理され、リストから何かを削除する必要があります。問題は、remove()関数が空のリストを見ることですが、リストに要素があるはずです。テストファイルからチェックすると、要素が追加された後にその要素が表示され、そこからremove()を呼び出すと、空でないリストが表示され、削除できます。私の質問は、コールバックに空のリストが表示されるのはなぜですか、リスト内の正しい要素を確認するにはどうすればよいですか?

種類が パトリック

答えて

1

に関しては、私はこの問題は、ZmqProcessクラスがmultiprocessing.Processから継承という事実に産むと考えています。マルチプロセスでは、異なるプロセス間でオブジェクトを共有することはできません。値または配列を使用した共有メモリマップを使用する場合を除きます(https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes

カスタムオブジェクトを使用する場合は、サーバプロセス/プロキシオブジェクト。ドキュメントの同じページにあります。

例えば、self.manager = Manager()のように、Storageクラスのinit関数でマネージャーを定義することができます。その後、self.list = self.manager.list()と入力します。これはトリックを行う必要があります。

関連する問題