ZeroMQで奇妙な動作が発生しましたが、私は今一日中デバッグしようとしています。ZMQ:複数の同時リクエストとポーリングでREQ/REPが失敗する
問題を再現する最小限のスクリプト例です。これはPython3で実行できます。
REPソケットを持つ1台のサーバが起動し、REPソケット付きの5台のクライアントが基本的に同時に接続します。その結果、最初のいくつかのメッセージの後に何らかの理由でサーバーがブロックされ始めます。 poller.poll(1000)
は無期限にブロックされているようです。
この動作もタイミングに依存しているようです。クライアントを起動するループにsleep(0.1)
を挿入し、期待どおりに動作します。
私はREPソケットがすべての着信メッセージを待ち行列に入れ、次々とそれを解放すると予想していました。sock.recv_multipart()
。
ここで何が起こっているか
import logging
from threading import Thread
from time import sleep
import zmq
logging.basicConfig(level=logging.INFO)
PORT = "3446"
stop_flag = False
def server():
logging.info("started server")
context = zmq.Context()
sock = context.socket(zmq.REP)
sock.bind("tcp://*:" + PORT)
logging.info("bound server")
poller = zmq.Poller()
poller.register(sock, zmq.POLLIN)
while not stop_flag:
socks = dict(poller.poll(1000))
if socks.get(sock) == zmq.POLLIN:
request = sock.recv_multipart()
logging.info("received %s", request)
# sleep(0.5)
sock.send_multipart(["reply".encode()] + request)
sock.close()
def client(name:str):
context = zmq.Context()
sock = context.socket(zmq.REQ)
sock.connect("tcp://localhost:" + PORT)
sock.send_multipart([name.encode()])
logging.info(sock.recv_multipart())
sock.close()
logging.info("starting server")
server_thread = Thread(target=server)
server_thread.start()
sleep(1)
nr_of_clients = 5
for i in range(nr_of_clients):
Thread(target=client, args=[str(i)]).start()
stop_flag = True