2012-03-01 8 views
1

私は現在、スレッド化されたサーバー/クライアントシステムを設定することが割り当てられている学校プロジェクトに取り組んでいます。システム内の各クライアントは、接続時にサーバー上に独自のスレッドが割り当てられているはずです。さらに、私は、サーバーが他のスレッドを実行することを望みます。コマンドラインからの入力に関するものと、すべてのクライアントへのメッセージのブロードキャストに関するものです。しかし、私は私が望むようにこれを実行することはできません。スレッドが互いにブロックしているようです。プログラムがコマンドラインから入力を受け取り、サーバーが接続しているクライアントに聞くように「同じ時間に」などのようにしたいと思います。Pythonでのスレッドの同期

私はPythonプログラミングとマルチスレッドの新機能ですが、私の考えは良いと思いますが、私のコードは機能しません。私は、別のスレッド間でメッセージの受け渡しをどのように実装するのか、正確にはわかりません。また、リソースロックコマンドを正しく実装する方法もありません。私はここに私のサーバーファイルと私のクライアントファイルのコードを掲載するつもりです、そして、私は誰かが私にこれを手伝ってくれることを願っています。私はこれが実際には2つの相対的に単純なスクリプトであるべきだと思います。私はできる限り良いものとして私のコードにコメントしようとしました。

import select 
import socket 
import sys 
import threading 
import client 

class Server: 

#initializing server socket 
def __init__(self, event): 
    self.host = 'localhost' 
    self.port = 50000 
    self.backlog = 5 
    self.size = 1024 
    self.server = None 
    self.server_running = False 
    self.listen_threads = [] 
    self.local_threads = [] 
    self.clients = [] 
    self.serverSocketLock = None 
    self.cmdLock = None 
    #here i have also declared some events for the command line input 
    #and the receive function respectively, not sure if correct 
    self.cmd_event = event 
    self.socket_event = event 

def openSocket(self): 
    #binding server to port 
    try: 
     self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.server.bind((self.host, self.port)) 
     self.server.listen(5) 
     print "Listening to port " + str(self.port) + "..." 
    except socket.error, (value,message): 
     if self.server: 
      self.server.close() 
     print "Could not open socket: " + message 
     sys.exit(1) 

def run(self): 
    self.openSocket() 

    #making Rlocks for the socket and for the command line input 

    self.serverSocketLock = threading.RLock() 
    self.cmdLock = threading.RLock() 

    #set blocking to non-blocking 
    self.server.setblocking(0) 
    #making two threads always running on the server, 
    #one for the command line input, and one for broadcasting (sending) 
    cmd_thread = threading.Thread(target=self.server_cmd) 
    broadcast_thread = threading.Thread(target=self.broadcast,args=[self.clients]) 
    cmd_thread.daemon = True 
    broadcast_thread.daemon = True 
    #append the threads to thread list 
    self.local_threads.append(cmd_thread) 
    self.local_threads.append(broadcast_thread) 

    cmd_thread.start() 
    broadcast_thread.start() 


    self.server_running = True 
    while self.server_running: 

     #connecting to "knocking" clients 
     try: 
      c = client.Client(self.server.accept()) 
      self.clients.append(c) 
      print "Client " + str(c.address) + " connected" 

      #making a thread for each clientn and appending it to client list 
      listen_thread = threading.Thread(target=self.listenToClient,args=[c]) 
      self.listen_threads.append(listen_thread) 
      listen_thread.daemon = True 
      listen_thread.start() 
      #setting event "client has connected" 
      self.socket_event.set() 

     except socket.error, (value, message): 
      continue 

    #close threads 

    self.server.close() 
    print "Closing client threads" 
    for c in self.listen_threads: 
     c.join() 

def listenToClient(self, c): 

    while self.server_running: 

     #the idea here is to wait until the thread gets the message "client 
     #has connected" 
     self.socket_event.wait() 
     #then clear the event immidiately... 
     self.socket_event.clear() 
     #and aquire the socket resource 
     self.serverSocketLock.acquire() 

     #the below is the receive thingy 

     try: 
      recvd_data = c.client.recv(self.size) 
      if recvd_data == "" or recvd_data == "close\n": 
       print "Client " + str(c.address) + (" disconnected...") 
       self.socket_event.clear() 
       self.serverSocketLock.release() 
       return 

      print recvd_data 

      #I put these here to avoid locking the resource if no message 
      #has been received 
      self.socket_event.clear() 
      self.serverSocketLock.release() 
     except socket.error, (value, message): 
      continue    

def server_cmd(self): 

    #this is a simple command line utility 
    while self.server_running: 

     #got to have a smart way to make this work 

     self.cmd_event.wait() 
     self.cmd_event.clear() 
     self.cmdLock.acquire() 


     cmd = sys.stdin.readline() 
     if cmd == "": 
      continue 
     if cmd == "close\n": 
      print "Server shutting down..." 
      self.server_running = False 

     self.cmdLock.release() 


def broadcast(self, clients): 
    while self.server_running: 

     #this function will broadcast a message received from one 
     #client, to all other clients, but i guess any thread 
     #aspects applied to the above, will work here also 

     try: 
      send_data = sys.stdin.readline() 
      if send_data == "": 
       continue 
      else: 
       for c in clients: 
        c.client.send(send_data) 
      self.serverSocketLock.release() 
      self.cmdLock.release() 
     except socket.error, (value, message): 
      continue 

if __name__ == "__main__": 
e = threading.Event() 
s = Server(e) 
s.run() 

そしてクライアントファイル

import select 
import socket 
import sys 
import server 
import threading 

class Client(threading.Thread): 

#initializing client socket 

def __init__(self,(client,address)): 
    threading.Thread.__init__(self) 
    self.client = client 
    self.address = address 
    self.size = 1024 
    self.client_running = False 
    self.running_threads = [] 
    self.ClientSocketLock = None 

def run(self): 

    #connect to server 
    self.client.connect(('localhost',50000)) 

    #making a lock for the socket resource 
    self.clientSocketLock = threading.Lock() 
    self.client.setblocking(0) 
    self.client_running = True 

    #making two threads, one for receiving messages from server... 
    listen = threading.Thread(target=self.listenToServer) 

    #...and one for sending messages to server 
    speak = threading.Thread(target=self.speakToServer) 

    #not actually sure wat daemon means 
    listen.daemon = True 
    speak.daemon = True 

    #appending the threads to the thread-list 
    self.running_threads.append(listen) 
    self.running_threads.append(speak) 
    listen.start() 
    speak.start() 

    #this while-loop is just for avoiding the script terminating 
    while self.client_running: 
     dummy = 1 

    #closing the threads if the client goes down 
    print "Client operating on its own" 
    self.client.close() 

    #close threads 
    for t in self.running_threads: 
     t.join() 
    return 

#defining "listen"-function 
def listenToServer(self): 
    while self.client_running: 

     #here i acquire the socket to this function, but i realize I also 
     #should have a message passing wait()-function or something 
     #somewhere 
     self.clientSocketLock.acquire() 

     try: 
      data_recvd = self.client.recv(self.size) 
      print data_recvd 
     except socket.error, (value,message): 
      continue 

     #releasing the socket resource 
     self.clientSocketLock.release() 

#defining "speak"-function, doing much the same as for the above function  
def speakToServer(self): 
    while self.client_running: 
     self.clientSocketLock.acquire() 
     try: 
      send_data = sys.stdin.readline() 
      if send_data == "close\n": 
       print "Disconnecting..." 
       self.client_running = False 
      else: 
       self.client.send(send_data) 
     except socket.error, (value,message): 
      continue 

     self.clientSocketLock.release() 

if __name__ == "__main__": 
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost')) 
c.run() 

を私はこれがあなたを介して読み取るためのかなりの数のコード行で実現するが、私はそれをコンセプトとスクリプト自己がすべきだと思う、私が言ったように非常に理解しやすい。誰かが私には適切な方法で私のスレッドを同期助けることができるかどうかは)=事前

おかげ---編集---

OK非常にappriciatedだろう。そこで、サーバーとクライアントモジュールの両方でsend関数とreceive関数だけを含むようにコードを単純化しました。サーバーに接続しているクライアントは独自のスレッドを取得し、両方のモジュールのsend関数とreceive関数はそれぞれのスレッドで動作します。これは、サーバーモジュール内のブロードキャスト機能が、あるクライアントからすべてのクライアントに到達する文字列をエコーするという魅力的なように機能します。ここまでは順調ですね!

スクリプトを実行したい次のことは、クライアントモジュールでクライアントをシャットダウンし、実行中のすべてのスレッドをスレッドリストに参加させるための特定のコマンド、つまり「閉じる」を実行することです。 speakToServerスレッドが入力 "close"を読み込んだことをlistenToServerとメインスレッドに通知するイベントフラグを使用しています。メインスレッドがwhileループから飛び出し、他のスレッドを結合するはずのforループを開始するようです。しかし、ここでそれはハングアップします。イベントフラグが設定されているときに、server_runningをFalseに設定しても、listenToServerスレッドのwhileループは停止しないようです。

私はクライアントとサーバモジュールの両方でより多くのスレッドを同期させることに関連したこれらの2つのスレッドを同期させるための答えを推測するので、クライアントモジュールのみをここに掲載します。後で

import select 
import socket 
import sys 
import server_bygg0203 
import threading 
from time import sleep 

class Client(threading.Thread): 

#initializing client socket 

def __init__(self,(client,address)): 

threading.Thread.__init__(self) 
self.client = client 
self.address = address 
self.size = 1024 
self.client_running = False 
self.running_threads = [] 
self.ClientSocketLock = None 
self.disconnected = threading.Event() 

def run(self): 

#connect to server 
self.client.connect(('localhost',50000)) 

#self.client.setblocking(0) 
self.client_running = True 

#making two threads, one for receiving messages from server... 
listen = threading.Thread(target=self.listenToServer) 

#...and one for sending messages to server 
speak = threading.Thread(target=self.speakToServer) 

#not actually sure what daemon means 
listen.daemon = True 
speak.daemon = True 

#appending the threads to the thread-list 
self.running_threads.append((listen,"listen")) 
self.running_threads.append((speak, "speak")) 
listen.start() 
speak.start() 

while self.client_running: 

    #check if event is set, and if it is 
    #set while statement to false 

    if self.disconnected.isSet(): 
     self.client_running = False 

#closing the threads if the client goes down 
print "Client operating on its own" 
self.client.shutdown(1) 
self.client.close() 

#close threads 

#the script hangs at the for-loop below, and 
#refuses to close the listen-thread (and possibly 
#also the speak thread, but it never gets that far) 

for t in self.running_threads: 
    print "Waiting for " + t[1] + " to close..." 
    t[0].join() 
self.disconnected.clear() 
return 

#defining "speak"-function  
def speakToServer(self): 

#sends strings to server 
while self.client_running: 
    try: 
     send_data = sys.stdin.readline() 
     self.client.send(send_data) 

     #I want the "close" command 
     #to set an event flag, which is being read by all other threads, 
     #and, at the same time set the while statement to false 

     if send_data == "close\n": 
      print "Disconnecting..." 
      self.disconnected.set() 
      self.client_running = False 
    except socket.error, (value,message): 
     continue 
return 

#defining "listen"-function 
def listenToServer(self): 

#receives strings from server 
while self.client_running: 

    #check if event is set, and if it is 
    #set while statement to false 

    if self.disconnected.isSet(): 
     self.client_running = False 
    try: 
     data_recvd = self.client.recv(self.size) 
     print data_recvd 
    except socket.error, (value,message): 
     continue 
return 

if __name__ == "__main__": 
c = Client((socket.socket(socket.AF_INET, socket.SOCK_STREAM),'localhost')) 
c.run() 

、私は、このサーバ/クライアントシステムを取得し、実行しているとき、私は「床の注文を受けて、各クライアントで、我々はラボにここにあるいくつかのエレベーターのモデルにこのシステムを使用するか、「アップ」となりますダウン "コール。サーバーは配布アルゴリズムを実行し、要求された注文に最も適したクライアントのエレベータキューを更新します。私はそれが長い道のりであることに気づいていますが、その時には一歩を踏み出すべきだと思います=)

誰かがこれを調べる時間があるといいですね。前もって感謝します。

+2

私はあなたのコードが「うまくいかない」ということについて、より明確にするべきだと思います。あなたはどんな行動を期待していましたか?どのような行動を観察しましたか? 「うまくいきません」とは数多くのことを意味します。 –

答えて

1

私がこのコードで目にする最大の問題は、すぐに問題を簡単にデバッグするにはあまりにも多くの時間がかかることです。スレッディングは、ロジックがどのように非線形になるかによって非常に複雑になります。特にロックとの同期について心配する必要があるとき。

クライアントが互いにブロックしているのは、サーバーのlistenToClient()ループでserverSocketLockを使用しているためです。正直言って、これはあなたのコードでは今あなたの問題ではありませんが、デバッグを開始してソケットをブロッキングソケットに変えたときに問題になりました。各接続を独自のスレッドに入れてそこから読み取る場合は、ここでグローバル・サーバー・ロックを使用する必要はありません。彼らはスレッドの目的である同時に、彼ら自身のソケットからすべて読むことができます。ここで

はあなたに私のお勧めです:

  1. はあなたが必要とする、と
  2. は、あなたがそうであるように、クライアントが接続して持って最初から始まらないすべてのロックや余分な糸を取り除くと、あなたのようにスレッドに入れてください。そして、毎秒データを送信させるだけです。複数のクライアントを接続して送信し、サーバーがループして受信していることを確認します。この部分が機能したら、次の部分に進むことができます。
  3. あなたのソケットは非ブロックに設定されています。これは、データが準備ができていないときに、すべての人が自分のループで本当に速く回転するようにしています。スレッド化しているので、ブロックするように設定する必要があります。そうすると、リーダースレッドは単に座ってデータを待ってすぐに応答します。

ロックは、スレッドが共有リソースにアクセスするときに使用されます。あなたは明らかにいつでもスレッドがリストや値のようなサーバ属性を試して修正する必要があります。しかし、彼らが自分のプライベートソケットで作業しているときではありません。

読者を誘発するために使用しているイベントは、ここでは必要ないようです。あなたはクライアントを受け取った後、スレッドを開始します。それで、それは行く準備ができています。

一言で言えば...一度に1ビットずつ簡略化してテストしてください。働いているときは、もっと追加してください。現在、スレッドとロックが多すぎます。

はここにあなたのlistenToClient方法の簡単な例です:

def listenToClient(self, c): 
    while self.server_running: 
     try: 
      recvd_data = c.client.recv(self.size) 
      print "received:", c, recvd_data 
      if recvd_data == "" or recvd_data == "close\n": 
       print "Client " + str(c.address) + (" disconnected...") 
       return 

      print recvd_data 

     except socket.error, (value, message): 
      if value == 35: 
       continue 
      else: 
       print "Error:", value, message 
+0

いくつかの合理的なアドバイスのようです。私は時間がかかるときにこれを調べます。デバッグに時間をとっていただきありがとうございます。私が新しいコードを取得して実行したときに回答を投稿します)=== –

+0

確かです。あなたの簡略化されたコードをもう一度見直す必要がある場合は、私に教えてください。 – jdi

1

バックアップ作業、それを投げる - 一部。

プログラムを実装し、各プログラムをテストする必要があります。まず、プログラムのinputに取り組んでください。受信した入力をどのようにブロードキャストするか心配しないでください。代わりに、あなたが正常に、繰り返しソケットに入力を受け取ることができることを心配してください。ここまでは順調ですね。

ここでは、この入力に反応して、他の接続されたクライアントにブロードキャストすることを希望します。あまりにも悪い、あなたはまだそれを行うことはできません!なぜなら、私は上記の段落の1つの細部を残したからです。 PROTOCOLを設計する必要があります。

プロトコルとは何ですか?それはコミュニケーションのための一連のルールです。クライアントがデータの送信を完了した時点をサーバーはどのように知っていますか?特殊文字で終了していますか?または、メッセージの最初のバイトまたは2つとして送信されるメッセージのサイズをエンコードします。

これは多くの仕事になりますか? :-)

単純なプロトコルは何ですか? line-orientedプロトコルは簡単です。レコード終端記号 ' - n'の終わりに達するまで、一度に1文字ずつ読む。だから、クライアントがサーバーに、このようなレコードを送信します - ?

をので、あなたの子供がいます\ nは

HELO \ nは MSG DAVEは、あなたが設計し、この単純なプロトコルを持っていると仮定すると、それを実装します。現時点では、多くの人が苦労しているわけではありません!それを動作させることについて心配してください。

現在のプロトコルは1024バイトを読み込むことです。悪いことではないかもしれませんが、クライアントから1024バイトのメッセージを確実に送信してください。

プロトコルの設定が完了したら、入力に反応してください。しかし、今のところあなたは入力を読み取るものが必要です。それが終わると、それを使って何かをすることを心配することができます。

jdiが正しいとすると、処理するプログラムが多すぎます。ピースは簡単に修正できます。