2017-03-23 5 views
1

私はWebSocketを初めて使用しているので、私のせいであるが、WebSocket(クロム)からサーバー(C#、TCPListener)のデータをwhileループ(ちょうどテストするために)データがサーバを適切に届かない。最初のバイトは通常なくなっており、キーも2つのメッセージの間で混ざり合っていることがあります。この結果、私のサーバーがクラッシュします。これは、データが不規則に送信された場合や超高速で送信されない場合には発生しません。ここで非常に高速なデータ送信時にWebソケットが正しく動作しない

は、サーバーのレシーバーコードです:ここで

#region Receiver 

    public class Receiver 
    { 
     private const int DEFAULT_DATA_LIMIT = 256; // Bytes 

     private const int HEADER_LENGTH = 2; 
     private const int KEYS_LENGTH = 4; 

     private const int SHORT_BYTES = 2; 
     private const int LONG_BYTES = 8; 

     private const int SHORT_DATA = 126; 
     private const int LONG_DATA = 127; 

     private readonly Socket TargetSocket; 

     private readonly object DataLock = new object(); 
     private readonly object amountOfDataReceivedLock = new object(); 
     private readonly object RecevingLock = new object(); 
     private readonly object CompletingRecieveLock = new object(); 

     private bool receiving; 

     private int _amoutOfDataReceived; 
     private byte[] Data; 

     private int amountOfDataReceived 
     { 
      get 
      { 
       lock(amountOfDataReceivedLock) 
        return _amoutOfDataReceived; 
      } 
      set 
      { 
       lock(amountOfDataReceivedLock) 
        _amoutOfDataReceived = value; 
      } 
     } 

     public int Capacity 
     { 
      get 
      { 
       return Data.Length; 
      } 
     } 

     public int FreeSpace 
     { 
      get 
      { 
       return Capacity - amountOfDataReceived; 
      } 
     } 

     public bool ReceivingData 
     { 
      get 
      { 
       return receiving; 
      } 
     } 

     public bool ChunkReady 
     { 
      get 
      { 
       int wholeChunkLength = WholeChunkLength(); 

       if(wholeChunkLength == -1) 
        return false; 

       return WholeChunkLength() <= amountOfDataReceived; 
      } 
     } 

     public Receiver(Socket socket, int dataLimit) 
     { 
      this.TargetSocket = socket; 
      Data = new byte[dataLimit]; 
     } 

     public Receiver(Socket socket) : this(socket, DEFAULT_DATA_LIMIT) { } 

     private int ChunkDataLength() 
     { 
      lock(DataLock) 
      { 
       lock(amountOfDataReceivedLock) 
       { 
        if(amountOfDataReceived < 1) 
         return -1; 

        int lengthInfoIndex = HEADER_LENGTH - 1; 

        if(amountOfDataReceived < lengthInfoIndex + 1) 
         return -1; 

        int rawLength = Data[lengthInfoIndex] -128; 

        int bytesRequiredToGetLength = 0; 

        if(rawLength == SHORT_DATA) 
         bytesRequiredToGetLength = SHORT_BYTES; 
        else if(rawLength == LONG_DATA) 
         bytesRequiredToGetLength = LONG_BYTES; 
        else 
         return rawLength; 

        if(amountOfDataReceived < lengthInfoIndex + 1 + bytesRequiredToGetLength) 
         return -1; 

        return Utilities.ToInt32(Data, lengthInfoIndex + 1, bytesRequiredToGetLength); 
       } 
      } 
     } 

     private int WholeChunkLength() 
     { 
      int dataLength = ChunkDataLength(); 
      int lengthInfoLength = dataLength < SHORT_DATA ? 0 : dataLength < short.MaxValue ? SHORT_BYTES : LONG_BYTES; 

      if(dataLength == -1) 
       return -1; 

      return HEADER_LENGTH + lengthInfoLength + KEYS_LENGTH + dataLength; 
     } 

     private void ReceiveDataInternal(int dataToReceiveLength) 
     { 
      if(dataToReceiveLength == 0) 
       return; 

      lock(RecevingLock) 
      { 
       if(receiving) 
        return; 

       receiving = true; 
      } 

      if(dataToReceiveLength > FreeSpace) 
       dataToReceiveLength = FreeSpace; 

      TargetSocket.BeginReceive(Data, amountOfDataReceived, dataToReceiveLength, SocketFlags.None, result => 
      { 
       OnRecevingComplete(result, dataToReceiveLength); 
      }, null); 

     } 

     private void OnRecevingComplete(System.IAsyncResult result, int receivedDataLength) 
     { 
      lock(CompletingRecieveLock) // This is not needed really 
      { 
       TargetSocket.EndReceive(result); 
       this.amountOfDataReceived += receivedDataLength; 
       receiving = false; 
      } 
     } 

     public void ReceiveData() 
     { 
      ReceiveDataInternal(TargetSocket.Available); 
     } 

     public byte[] GetChunk() 
     { 
      lock(DataLock) 
      { 
       lock(amountOfDataReceivedLock) 
       { 
        int chunkLength = WholeChunkLength(); 

        if(chunkLength == -1 || chunkLength > amountOfDataReceived) 
         return null; 
        //       throw new System.InvalidOperationException("Chunk is yet not ready!"); 

        byte[] chunk = new byte[chunkLength]; 

        for(int i = 0; i < chunkLength; i++) 
         chunk[i] = Data[i]; 

        ArrayUtilities<byte>.ShiftArrayLeft(Data, chunkLength, amountOfDataReceived); 
        amountOfDataReceived -= chunkLength; 

        return chunk; 
       } 
      } 
     } 

    } 

    #endregion 

は、機能自体は以外の新しいスレッド上でこのような何か(、によって呼び出され

private void Update() 
    { 
     if(!running) 
      return; 

     for(int i = 0; i < clients.TotalClients; i++) 
     { 
      if(!clients[i].IsReady) 
       continue; 

      var recievier = clients[i].GetReciever(); 

      if(recievier.recievier.FreeSpace > 0 && clients[i].DataAvaliable && !recievier.Receiving) 
       recievier.ReceiveData(); 

      if(recievier.ChunkReady) 
      { 
       var data = recievier.GetChunk(); 

       Utilities.WSFormatter.DecodeMessage(data); 

       int SI = Utilities.WSFormatter.MessageStartIndex(data); 

       System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(data, SI, data.Length - SI)); 
      } 

     } 
    } 

レシーバのクラスを呼び出す機能ですデフォルトのもの):

while (true) 
{ 
    Update(); 
    System.Threading.Thread.Sleep(1); 
} 

ここにWebSocketコード:

 Engine.Loader.loadEngine(function() { 

     var client = new Engine.Client("ws:192.168.1.105:8080"); 
     var connected = false; 

     client.addConnectListener(function() 
     { 
      connected = true; 
      console.log("Connection successful!");    
     }); 

     client.addRecieveListener(function(data) 
     { 
      console.log(data); 
     }); 

     var gl = new Engine.GameLoop(new Engine.Renderer(), new Engine.Input(), client); 
     gl.start(); 

     var i = 0; 

     gl.addEventListener("UPDATE", function() 
     { 
      /*The code that works*/ 

      if(connected) 
       client.sendString("" + i++); 

      /*The code that causes problems*/ 

      var j = 10; 

      while(connected && j-- > 0) 
       client.sendString("" + i++); 

     }); 

    }); 

EDIT:UPDATEイベントがwindow.requestAnimationFrame

EDITによって呼び出されます:私は、以前の投稿の答えにも失敗しました。これは、エラーが発生する前にdata []が満たされたときにのみ機能します。だから私がデータの容量を増やすならば、データを受け取る時間も増やす必要があります。

編集:すべての作業を完了しました。それには複数の問題がありました。データはSocket.BeginReceive()メソッドが内部でやっていたものとDataReceiver.GetChunk()によって同時に変更されていることがわかります。 これで、最初に一時バッファ内のデータを受信して​​から、完全なイベントがロックされている間にメインバッファに書き込んで、他のスレッドがそれを混乱させることはありません(私の知る限り、完全なメソッドは呼び出されませんBeginRecieve関数が呼び出されたスレッドで)。

また、@vtortolaは、完全なイベントがトリガされたときにすべてのデータがバッファに入っていない可能性があると言いました。

実際に動作するスクリプトは次のとおりです。これは、再書かれていますが、いくつかの方法は、古いスクリプト(ええ、私は怠け者だ)からコピーされます。

public class DataReceiver 
{ 
private const int DEFAULT_DIRECT_BUFFER_LIMIT = 256; 
private const int DEFAULT_DATA_LIMIT = 256; // bytes 

private const int HEADER_LENGTH = 2; 
private const int KEYS_LENGTH = 4; 

private const int SHORT_DATA = 126; 
private const int SHORT_BYTES = 2; 

private const int LONG_DATA = 127; 
private const int LONG_BYTES = 8; 

private readonly Socket TargetSocket; 

private readonly object dataUpdatingLock = new object(); 

private bool receivingData; 

private byte[] directRecieveBuffer; 

private int dataReceived; 
private byte[] data; 

public int Capacity 
{ 
    get 
    { 
     return data.Length; 
    } 
} 

public int AmountOfDataReceived 
{ 
    get 
    { 
     return dataReceived; 
    } 
} 

public int FreeSpace 
{ 
    get 
    { 
     return Capacity - AmountOfDataReceived; 
    } 
} 

public bool ReceivingData 
{ 
    get 
    { 
     return receivingData; 
    } 
} 

public bool ChunkReady 
{ 
    get 
    { 
     int chunkLength = WholeChunkLength(); 

     if(chunkLength < 1) 
      return false; 

     return chunkLength <= dataReceived; 
    } 
} 

private DataReceiver(Socket socket, int bufferLength, int directBufferLength) 
{ 
    this.TargetSocket = socket; 
    this.data = new byte[bufferLength]; 
    this.directRecieveBuffer = new byte[directBufferLength]; 
} 

public DataReceiver(Socket socket, int bufferLength) : this(socket, bufferLength, DEFAULT_DIRECT_BUFFER_LIMIT) { } 

public DataReceiver(Socket socket) : this(socket, DEFAULT_DATA_LIMIT, DEFAULT_DIRECT_BUFFER_LIMIT) { } 

private void ReceiveDataInternally() 
{ 
    receivingData = true; 

    int expectedDataLength = TargetSocket.Available; 

    if(expectedDataLength > FreeSpace) 
     expectedDataLength = FreeSpace; 

    if(expectedDataLength > directRecieveBuffer.Length) 
     expectedDataLength = directRecieveBuffer.Length; 

    TargetSocket.BeginReceive(directRecieveBuffer, 0, expectedDataLength, SocketFlags.None, result => 
    { 
     int receivedDataLength = TargetSocket.EndReceive(result); 

     lock(dataUpdatingLock) 
     { 
      for(int i = 0; i < receivedDataLength; i++) 
      { 
       data[dataReceived++] = directRecieveBuffer[i]; 
       directRecieveBuffer[i] = 0; 
      } 
     } 

     receivingData = false; 

    }, null); 
} 

public byte[] GetChunk() 
{ 
    int chunkLength = WholeChunkLength(); 

    if(chunkLength == -1 || chunkLength > dataReceived) 
     return null; 

    byte[] chunk = new byte[chunkLength]; 

    for(int i = 0; i < chunkLength; i++) 
     chunk[i] = data[i]; 

    lock(dataUpdatingLock) 
    { 
     ArrayUtilities<byte>.ShiftArrayLeft(data, chunkLength, dataReceived); 
     dataReceived -= chunkLength; 
    } 

    return chunk; 
} 

private int ChunkDataLength() 
{ 
    if(dataReceived < 1) 
     return -1; 

    int lengthInfoIndex = HEADER_LENGTH - 1; 

    if(dataReceived < HEADER_LENGTH) 
     return -1; 

    int rawLength = data[lengthInfoIndex] & 127; 

    int bytesRequiredToGetLength = 0; 

    if(rawLength == SHORT_DATA) 
     bytesRequiredToGetLength = SHORT_BYTES; 
    else if(rawLength == LONG_DATA) 
     bytesRequiredToGetLength = LONG_BYTES; 
    else 
     return rawLength; 

    if(dataReceived < HEADER_LENGTH + bytesRequiredToGetLength) 
     return -1; 

    return Utilities.ToInt32(data, lengthInfoIndex + 1, bytesRequiredToGetLength); 
} 

private int WholeChunkLength() 
{ 
    int dataLength = ChunkDataLength(); 
    int lengthInfoLength = dataLength < SHORT_DATA ? 0 : dataLength < short.MaxValue ? SHORT_BYTES : LONG_BYTES; 

    if(dataLength == -1) 
     return -1; 

    return HEADER_LENGTH + lengthInfoLength + KEYS_LENGTH + dataLength; 
} 

public void StartReceving() 
{ 
    if(!receivingData) 
     ReceiveDataInternally(); 
} 
} 
+0

TCPはストリームなのですか?したがって、1つのメッセージとして複数のメッセージを受信することができます。これは、短時間に複数のメッセージを送信する場合に発生します。それらは統合されます。また、メッセージは複数の受信で分割される可能性があります。 'client.sendString(" "+ i ++);' '長さは何ですか?どのように次のメッセージが始まるかはどうやって分かりますか? –

+0

Webソケットには、新しいメッセージのそれぞれが129で始まり、実際のメッセージの長さの後にキーが続き、実際のメッセージが続きます。 1バイト(テキストフレームの場合)、1バイト(実際のデータの長さ情報)、4バイトのキー、次にバッファの2バイト目の長さに加えて、 。私はTCPが1つとして多くのメッセージを受け取ることができ、それに対処するために私の受信機を作ったことがわかりますが、なぜテキストフレームが何故最善で、なぜキーが無効であるのか説明しません。 – KidCoder

+0

私は、新しいメッセージがいつ前のメッセージの長さで始まるかを知っています。 – KidCoder

答えて

0

あなたがrecievier.ReceiveData();を呼び出すと、あなたがデータを引き受けるが、方法が終了した時点でリードされたが、それは適切ではありません。そのメソッドへの呼び出しは、読み取りを開始するだけで、OnRecevingCompleteを呼び出す匿名の代理人で非同期で終了します。これは、アプリが単なる単なる呼び出し以上の負荷を持っている場合に、より顕著になります。 TcpListenerを使用すると、SocketBeginXXXEndXXXではなく、async/awaitで作業する方が簡単です。

一度読み込みが完了すると、すべてのデータはバッファに保存されていても、読み込まれていない可能性があります。たとえば、6バイトを読み取る場合は、6回、3回、またはたった1回の読み取りが必要になることがあります。whileループが必要です。

は、ここで例を参照してください:サイドノートとしてHow can I read from a socket repeatedly?

public void ContinuousReceive(){ 
    byte[] buffer = new byte[1024]; 
    bool terminationCodeReceived = false; 
    while(!terminationCodeReceived){ 
     try{ 
      if(server.Receive(buffer)>0){ 
      // We got something 
      // Parse the received data and check if the termination code 
      // is received or not 
      } 
     }catch (SocketException e){ 
      Console.WriteLine("Oops! Something bad happened:" + e.Message); 
     } 
    } 
    } 

:あなたはInt32として8つのバイトを解析する場合、それがオーバーフローするよう

return Utilities.ToInt32(Data, lengthInfoIndex + 1, bytesRequiredToGetLength); 

Int32は、4バイトです。ヘッダは、複数のスレッドを読み取ることができず、複数で書き込むことができない= 125、126または127 <

を示す場合に応じて、:また、長さはushortuint及びulong(符号なし整数IE)として来ることを覚えソケット上のスレッド。独立したものから同時に読み書きすることはできますが、同じ操作を並行して行うことはできません。予測できないエラーが発生します。あなたはロックを持たず、クラスをスレッドセーフではないと考えてください(たとえば、NetworkStreamです)。複数のスレッドがネットワーキングプログラミングのパフォーマンスを向上させるわけではありません。代わりに、コードを非同期にすることに集中します。

Good old book that I loved

+0

Thanks @vtortola!私のUtilities.Int32メソッドは、必要以上に多くのバイトを処理します。私はパラメータとして4バイトを提供するにはあまりにも怠惰でしたが、すべてが機能し、オーバーフローがなく、正しい結果を得ます:)。あなたはrecitalier.ReceiveData()を呼び出すと_italic_と言った。メソッドが終了した時点でデータが読み込まれたものの、正しくはないとみなします。 _italic_しかし、データが受信者によって読み取られたかどうかを確認しました。 ChunkReadyプロパティ。しかし、問題はあなたが第2段落で言ったことのために起こったようです。私はあなたが言ったことに従ってクラスを書き直すでしょう! – KidCoder

+0

OKは働いています@vtortola助けてくれてありがとう。 1つではなく、多くの問題がありました。私は作業スクリプトで質問を更新しました!そして、あなたが言ったように、私はロックを削除し、1つのスレッドからのみそれを使用します。 – KidCoder

関連する問題