2017-12-18 9 views
1

マルチスレッドを理解するために遊んでいたので、サーバがクライアントにコマンドを送信するクライアント/サーバアプリケーションを作成しました。 'a'には、サーバーに返信が送信されます。Pythonマルチスレッドサーバは一度に1つのクライアントメッセージを処理できます

サーバーコードでは、2つのソケットと1つのスレッドを作成しました。最初のソケットは、接続された(登録された)すべてのクライアントにコマンドを送信(パブリッシュ)します。スレッドでは、2番目のソケットはクライアントからの応答を待ちますが、スレッドがいくつかのブロッキング操作を実行する(たとえば、クライアントがデータベースに送信した情報を格納するなど)ため、ソケット(req-repソケット)は同時に複数のメッセージを受信できます。

server.py

import zmq 
import logging 
import threading 
import time 

logging.basicConfig(level=logging.DEBUG) 


class Server(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.pub_port = 7777 
     self.rep_port = 7778 

     self.pub_socket = None 
     self.rep_socket = None 
     self.interface = "*" 

    def bind_ports(self): 
     logging.debug("[bind_ports] binding the ports....") 
     self.pub_socket = self.context.socket(zmq.PUB) 
     pub_bind_str = "tcp://{}:{}".format(self.interface, self.pub_port) 
     self.pub_socket.bind(pub_bind_str) 

     self.rep_socket = self.context.socket(zmq.REP) 
     rep_bind_str = "tcp://{}:{}".format(self.interface, self.rep_port) 
     self.rep_socket.bind(rep_bind_str) 

    def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      logging.info("[received_data] data <{}>".format(flow)) 
      self.rep_socket.send(b"\x00") 
      self.blocking_op(cl_data) 

    def blocking_op(self, data): 
     time.sleep(1) # simulating some blocking operations e.g. storing info in a database 

    def push_instruction(self, cmd): 
     logging.debug("[push_inst] Sending the instruction <%s> to the clients...", 
     # logging.debug("[push_inst] Sending the instruction <%s> to the agents ...", 
     cmd) 
     instruction = {"cmd": cmd} 
     self.pub_socket.send_json(instruction) 

    def create_thread(self): 
     thread = threading.Thread(target=self.received_info) 
     thread.daemon = True 
     thread.start() 
     logging.debug("[create_thread] Thread created <{}>".format(
                 thread.is_alive())) 

    def start_main_loop(self): 
     logging.debug("[start_main_loop] Loop started....") 
     self.bind_ports() 
     self.create_thread() 

     while True: 
      cmd = input("Enter your command: ") 
      self.push_instruction(cmd) 

if __name__ == "__main__": 
    Server().start_main_loop() 

client.py

import zmq 
import logging 
import random 
import time 

logging.basicConfig(level=logging.DEBUG) 

class Client(object): 
    def __init__(self): 
     self.context = zmq.Context() 
     self.sub_socket = None 
     self.req_socket = None 

     self.pub_port = 7777 
     self.req_port = 7778 
     self.server_ip = 'localhost' 

     self.client_id = "" 

    def connect_to_server(self): 
     logging.debug("[conn_to_serv] Connecting to the server ....") 
     self.sub_socket = self.context.socket(zmq.SUB) 
     self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") 
     conn_str = "tcp://{}:{}".format(self.server_ip, self.pub_port) 
     self.sub_socket.connect(conn_str) 

     self.req_socket = self.context.socket(zmq.REQ) 
     req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port) 
     self.req_socket.connect(req_conn_str) 

    def get_instruction(self): 
     inst = self.sub_socket.recv_json() 
     logging.debug("[get_inst] Server sent inst") 
     cmd = inst["cmd"] 
     return cmd 
    def send_flow(self, x, y): 
     flow = { 
      "client_id": self.client_id, 
      "x": x, 
      "y": y 
     } 
     self.req_socket.send_json(flow) 

    def start_main_loop(self): 
     logging.debug("starting the main loop ....") 
     self.client_id = input("What is your id: ") 
     self.connect_to_server() 

     while True: 
      inst = self.get_instruction() 
      logging.info("[Main_loop] inst<{}>".format(inst)) 
      if inst == "a": 
       # time.sleep(random.uniform(.6, 1.5)) 
       self.send_flow("xxx", "yyy") 
       self.req_socket.recv() 
       logging.debug("[main_loop] server received the flow") 

if __name__ == "__main__": 
    Client().start_main_loop() 

誰もがそれがで複数のクライアントのメッセージを果たすことができるように私は、サーバーを向上させることができます場合、私はそれをお願い申し上げます同時。

+0

あなたの応答処理がブロックされるか、または時間がかかる場合は、あなたの 'receive_info()'で応答を読み込み、実際の処理を行うスレッドを起動するのがよいでしょう。このスレッドの実行には時間がかかりますが、メインループはブロックされません。 – Hannu

答えて

1

私はあなたのコードとテストを実行することができませんでしたが、問題がreceive_info()である場合は、スレッドを起動して実際の応答を処理することで回避できます。このような何か(タイプミスが含まれているかもしれませんが、私はあなたのコードをテストすることができませんでした - 。例えばflowが何であるか全くわから)

def handle_response(self, data): 
    logging.info("[received_data] data <{}>".format(flow)) 
    self.rep_socket.send(b"\x00") 
    self.blocking_op(data) 

def received_info(self): 
     while True: 
      # logging.debug("[received_flow] ") 
      cl_data = self.rep_socket.recv_json() 
      _t = threading.Thread(target=self.handle_response, args=(cl_data,)) 
      _t.start() 

これはそのままごreceived_info()ループを持っていますが、代わりの処理を行いますそこでは、新しいスレッドが起動されて応答を処理します。完了までに要する時間がかかってスレッドが消えますが、received_info()はすぐに新しい応答を待つ準備が整います。

+0

大変ありがとうございました。ところで、args =(cl_data、)なぜcl_dataの後に昏睡があるのですか? もう1つの質問:スレッド1000を使用したり、gevent(またはasyncio)を使用する方が良いでしょうか? – Corey

+0

1つの引数だけを渡すのでカンマがあり、 'args'はタプルでなければなりません。いくつかの引数を渡した場合、args =(a、b、c)を末尾のカンマなしで記述できますが、これは1つの項目からタプルを作成する最も簡単な方法です。 – Hannu

+0

私はasyncioの専門家ではないので、パフォーマンスについてはコメントできません。 Pythonは、GILのために、とにかく最も効率的な並行処理言語ではありません。スレッドを試し、問題がある場合は調査してください。あなたはスレッドで絶対に良いかもしれません。 – Hannu

関連する問題