私は分散プログラムを使いました。ネットワーク内のすべてのノード(仮想マシン)は、他のすべてのノードとの間でデータを(送信接続を介して)送信し、(受信接続を介して)データを受信します。データを送信する前に、すべてのノードが他のすべてのノード(単一のソースノードを含む)へのソケットを持っています。 3秒の遅延の後、送信元はネットワーク内の他のノードのそれぞれに異なるファイルチャンクを送信し始めます。すべてのノードは、最初のパケットが到着した後に受信チャンクを転送し始める。ピアによる接続リセット[Errno 104] in Python
プログラムはエラーなしで何度も正常に終了します。しかし、時には1つのランダムなノードが(データを送信しているコネクションを通じてデータを送信しながら)コネクションをリセットすることがあります。
各ノードには、n-2送信側スレッドとn-1受信側スレッドがあります。
送信機能:
def relaySegment_Parallel(self):
connectionInfoList = []
seenSegments = []
readyServers = []
BUFFER_SIZE = Node.bufferSize
while len(readyServers) < self.connectingPeersNum-len(Node.sources) and self.isMainThreadActive(): #Data won't be relayed to the sources
try:
tempIp = None
for ip in Node.IPAddresses:
if ip not in readyServers and ip != self.ip and ip not in self.getSourcesIp():
tempIp = ip
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, Node.dataPort))
connectionInfoList.append((s, ip))
readyServers.append(ip)
if Node.debugLevel2Enable:
print "RelayHandler: Outgoing connection established with IP: " + str(ip)
except socket.error, v:
errorcode = v[0]
if errorcode == errno.ECONNRESET:
print "(RelayHandler) Connection reset ! Node's IP: " + str(tempIp)
if errorcode == errno.ECONNREFUSED:
print "(RelayHandler) Node " + str(tempIp) + " are not ready yet!"
continue
except:
print "Error: Cannot connect to IP: " + str (tempIp)
continue
print "(RelayHandler) Ready to relay data to " + str(len(readyServers)) + " numeber of servers."
try:
pool = ThreadPool(processes = Node.threadPoolSize)
while Node.terminateFlag == 0 and not self.isDistributionDone() and self.isMainThreadActive():
if len(self.toSendTupleList) > 0:
self.toSendLock.acquire()
segmentNo, segmentSize, segmentStartingOffset, data = self.toSendTupleList.pop(0)
self.toSendLock.release()
if len(data) > 0:
if segmentNo not in seenSegments:
#Type: 0 = From Sourece , 1 = From Rlayer
#Sender Type/Segment No./Segment Size/Segment Starting Offset/
tempList = []
for s, ip in connectionInfoList:
tempData = "1/" + str(self.fileSize) + "/" + str(segmentNo) + "/" + str(segmentSize) + "/" + str(segmentStartingOffset) + "/"
tempList.append((s, ip, tempData))
pool.map(self.relayWorker, tempList)
seenSegments.append(segmentNo)
relayList = []
for s, ip in connectionInfoList:
relayList.append((s, ip, data))
pool.map(self.relayWorker, relayList)
for s, ip in connectionInfoList:
s.shutdown(1)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed.
s.close()
pool.close()
pool.join()
except socket.error, v:
errorcode=v[0]
if errorcode==errno.ECONNREFUSED:
print "(RelayHandler) Error: Connection Refused in RelaySegment function. It can not connect to: ", ip
else:
print "\n(RelayHandler) Error1 in relaying segments (Parallel) to ", ip, " !!! ErrorCode: ", errorcode
traceback.print_exception(*sys.exc_info())
except:
print "\n(RelayHandler) Error2 in relaying segments (Parallel) to ", ip
traceback.print_exception(*sys.exc_info())
受信機能:
def receiveDataHandler(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# Allows us to resue the port immediately after termination of the program
s.bind((self.ip, Node.dataPort))
s.listen(Node.MaxNumClientListenedTo)
threadsList = []
fHandler = fileHandler(self.inFileAddr, Node.bufferSize)
isStart = False
executionTime = 0
connectedPeersSofar = 0
while (not self.connectingPeersNum == connectedPeersSofar) and self.isMainThreadActive() and Node.terminateFlag == 0 and not self.isReceptionDone():
conn, ipAddr = s.accept()
thread_receiveData = Thread2(target = self.receiveData_Serial, args = (conn, ipAddr, fHandler))
thread_receiveData.start()
if Node.debugLevel2Enable:
print 'Receive Handler: New thread started for connection from address:', ipAddr
connectedPeersSofar += 1
threadsList.append(thread_receiveData)
if isStart == False:
isStart = True
print "(RecieiverHandeler) Receiver stops listening: Peers Num "+str(self.connectingPeersNum) +i " connected peers so far: " + str(connectedPeersSofar)
for i in range(0, len(threadsList)):
self.startTime = threadsList[i].join()
if isStart:
executionTime = float(time.time()) - float(self.startTime)
else:
print "\n\t No Start! Execution Time: --- 0 seconds ---" , "\n"
s.shutdown(2)# 0:Further receives are disallowed -- 1: Further sends are disallow/sends -- 2: Further sends and receives are disallowed.
s.close()
return executionTime
except socket.error, v:
errorcode = v[0]
if errorcode == 22: # 22: Invalid arument
print "Error: Invalid argument in connection acceptance (receive data handler)"
elif errorcode==errno.ECONNREFUSED:
print "Error: Connection Refused in receive"
else:
print "Error1 in Data receive Handler !!! ErrorCode: ", errorcode
traceback.print_exception(*sys.exc_info())
except:
print "Error2 in Data receive Handler !!!"
traceback.print_exception(*sys.exc_info())
ノードが(ランダム誤動作ノードを含む)他のすべてのノードに接続されている全てのノードプリントの送信スレッド。しかしながら、ランダムなノードの受信機能は
s.accept()
に待機し、任意の接続が、接続するための最後のものである単一のソースからの接続を受け付けません。ランダムノードは、例外を発生させることなく待機します。
s.accept()
は、いずれかが、最後の1を受け入れていない一方で、ランダムなノードの
s.listen()
(TCPのprotocole)は、送信者は、それらが接続されていることを考えさせるようです。次に、何らかの理由でコネクションをリセットするため、他の人(送信者)がデータを送信しようとすると「Connection reset by peer」例外が送出されるのです。何のエラーもなく仕事を終える唯一の送信者は、最後に接続する送信元です。
エラー:なぜ起こっていることを
Traceback (most recent call last):
File "/home/ubuntu/DCDataDistribution/Node.py", line 137, in relayWorker
socketConn.sendall(data)
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 104] Connection reset by peer
?
FYI: Amazon EC2インスタンスでプログラムを実行しています。各インスタンスのタイプはt2.micro(1 vCPU、2.5 GHz、Intel Xeonファミリ(最大3.3 GHz)、1 GiBメモリ)です。 Ubuntu Server 14.04 LTS(HVM)はすべてのインスタンスで実行されています。
ここで診断するコードは不十分で、説明は少し難しいです。推測のように、あなたの "ランダムな"受信ノードが間違ったソケットを閉じていると思われ、 'ECONNRESET'エラーが発生します。たぶん、データ構造のスレッドの同期の問題? –
@GilHamiltonコードの削除部分を追加しました。実際には、「ランダム」な受信ノードは、受信機能の「while」ブロックから決して出てこないので、他の人が接続するのを待っています(他の人は接続していると言います)。だから、ノードは、ソケットを閉じる機会を得るために "while"ブロックから出ることはありません。また、ソケットを閉じるのは "receiveData_Serial"関数ではなく "receiveDataHandler"関数だけです。これ以上の説明が必要な場合は、私は幸せ以上になります。 –
説明にコードをマッチさせるのに苦労しています。たとえば、「ソケットを閉じるのは「receiveDataHandler」だけですが、受け入れられたソケットはまったく閉じられていません。提案:もしノードAがノードBに接続していると言うなら、リモートポート番号がconnect(ノードAのgetpeername)のために何であるか調べる。次に、ノードBで 'netstat -atn'を実行し、そのポートがどの状態にあるかを調べます。 –