私はチャットサーバーを作成しており、ユニットテスト中に次の問題が発生しました。私の単体テストでは、多くのテストクライアントを私のサーバに接続しています。接続されたユーザーの数が511に達すると、サーバーはエラーメッセージなしで応答を停止します。この段階では、すべてがPC上でローカルに実行されます。ツイスト:TCPサーバーへのクライアント接続数が制限されていますか?
私はフォーラムに貼り付ける簡単なサーバー、テストクライアント、および単体テストコードを用意しました。
なぜサーバーがハングアップするのですか? どんな助力も大変ありがとうございます。
このコードは基本的にツイストの簡単なチャットチュートリアルのものです。 簡易サーバー:
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
class Chat(LineReceiver):
def __init__(self, users, userNum):
self.users = users
self.userNum = userNum
def connectionMade(self):
print "Connected to user %d" % (self.userNum)
self.users[self.userNum] = self
def connectionLost(self, reason):
print "Connection to user %d lost" % (self.userNum)
if self.users.has_key(self.userNum):
del self.users[self.userNum]
def lineReceived(self, line):
for user in self.users:
if user == self.userNum:
continue
self.users[user].sendLine("%d - %s" % (self.userNum, line))
class ChatFactory(Factory):
def __init__(self):
self.users = {} # maps user names to Chat instances
self.nUsers = 0
def buildProtocol(self, addr):
self.nUsers += 1
return Chat(self.users, self.nUsers)
def clientConnectionFailed(self, connector, reason):
print 'connection failed:', reason.getErrorMessage()
reactor.stop()
def clientConnectionLost(self, connector, reason):
print 'connection lost:', reason.getErrorMessage()
reactor.stop()
reactor.listenTCP(8123, ChatFactory())
reactor.run()
これはテストクライアントです。このクライアントは、ユニットテストによって数回インスタンス化されます。
import socket
HOST = "localhost"
PORT = 8123
class TestClient:
def __init__(self):
self.connected = False
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error, msg:
print("Socket error %s" % msg)
def connect(self):
try:
self.socket.settimeout(10)
self.socket.connect((HOST, PORT))
self.connected = True
except socket.error, msg:
print("Socket error %s" % msg)
self.connected = False
def disconnect(self):
self.socket.close()
def connected(self):
return self.connected
最後ユニットテストコードファイル:
import unittest
from TestClient import TestClient
class TestSequenceFunctions(unittest.TestCase):
def test_manyUsers(self):
users = []
number_of_users = 1000
for u in range(number_of_users):
# create client
users.append(TestClient())
# connect client to server
users[-1].connect()
# check connected state
self.assertTrue(users[-1].connected, "User %d is not connected" % (u))
# close connection of all users
for user in users:
user.disconnect()
if __name__ == '__main__':
unittest.main()
これはどのOSですか?あなたはOS制限を打つことができますか? –
答えをありがとう。私は現在、Windows 7 64ビットで作業しています。私は、オンラインになったときに、どのOS上でサーバが動作するのか不明です。 OSの限界を知るにはどうすればいいですか? – Zoli