2012-03-16 3 views
3

コースの割り当ての一部として、大きな画像ファイルを送信するためにUDPレイヤーのJava提供物の上に追加の信頼性レイヤーを追加するという課題がありました。 http://en.wikipedia.org/wiki/Go_back_Nデータグラムソケット私のUDPプログラムで他のスレッドが詰まっています

この問題の重要な点は、パケットを送信できることと同時に、肯定応答が古いパケットに含まれているかどうかを確認しているかどうかに依存していることです。ウィンドウを動かすことができます。

私は現在、2つのスレッドを持っています:ウィンドウにスペースがある場合、次のパケットを送信する1つ。継続的に何らかの着信確認を聞き、適切に反応するものがあります。

私の問題は、この2つのスレッドがsimulatneously動作しているかのようにプログラムをスレッド化する必要がありますが、実際にはACKReceiverスレッドが非常に不均衡な時間を費やしているようです。スレッドダンプから、DataSocket.receive()行に達すると、ここで実行をブロックし、その間に実行する機会を他のスレッドに与えないで、送信スレッドを少し枯らすように見えます。

私は問題がDatagramSocket.receiveが同期されているという事実とは何かである...しかし、問題に対する使用可能なソリューションを提供していないことをほのめかしているようだ、次の質問を見て持っていた:

Java Thread won't pause on I/O operation

ここで私のコードの送信者側のコードですが、相手側の受信機は完全に問題ないと確信しています(あるものは、それを動作させるためにスレッドを使用する必要はありませんでした)。 :

import java.io.*; 
import java.net.*; 
import java.nio.ByteBuffer; 

public class Sender3 { 
    short base = 0; 
    short nextSeqNum = 0; 
    DatagramPacket[] packets; 
    ByteBuffer bb; 
    String endSys; 
    int portNum; 
    String fileName; 
    int retryTime; 
    int windowSize; 
    DatagramSocket clientSocket; 
    InetAddress IPAddress; 
    boolean timedOut = false; 

    public Sender3(String endSys, int portNum, String fileName, int retryTime, int windowSize){ 
     this.endSys = endSys; 
     this.portNum = portNum; 
     this.fileName = fileName; 
     this.retryTime = retryTime; 
     this.windowSize = windowSize; 
    } 

    public static void main(String args[]) throws Exception{ 
     //Check for current arguments and assign them 
     if(args.length != 5){ 
      System.out.println("Invalid number of arguments. Please specify: <endSystem> <portNumber> <fileName> <retryTimeout><windowSize>"); 
      System.exit(1); 
     } 

     Sender3 sendy = new Sender3(args[0], Integer.parseInt(args[1]), args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4])); 

     sendy.go(); 
    } 

    private void go() throws Exception{ 

     clientSocket = new DatagramSocket(); 



     bb = ByteBuffer.allocate(2); 
     byte[] picData = new byte[1021]; 
     byte[] sendData = new byte[1024]; 

     Thread.yield() 
     short seqNum = 0; 
     byte[] seqBytes = new byte[2]; 
     byte EOFFlag = 0; 
     boolean acknowledged = false; 
     int lastPacketRetrys = 0; 
     int resends = 0; 
     IPAddress = InetAddress.getByName(endSys); 

     FileInputStream imReader = new FileInputStream(new File(fileName)); 
     double fileSizeKb = imReader.available()/1021.0; //We add 3 bytes to every packet, so dividing by 1021 will give us total kb sent. 
     int packetsNeeded = (int) Math.ceil(fileSizeKb); 
     packets = new DatagramPacket[packetsNeeded]; 
     long startTime = System.currentTimeMillis(); 
     long endTime; 
     double throughput; 

     //Create array of packets to send 
     for(int i = 0; i < packets.length; i++){ 
      if(i == packets.length - 1){ 
       EOFFlag = 1; 
       picData = new byte[imReader.available()]; 
       sendData = new byte[picData.length + 3]; 
      } 
      imReader.read(picData); 
      bb.putShort((short)i); 
      bb.flip(); 
      seqBytes = bb.array(); 
      bb.clear(); 
      System.arraycopy(seqBytes, 0, sendData, 0, seqBytes.length); 
      sendData[2] = EOFFlag; 
      System.arraycopy(picData, 0, sendData, 3, picData.length); 
      packets[i] = new DatagramPacket((byte[])sendData.clone(), sendData.length, IPAddress, portNum); 
     } 

     ACKGetter ackGet = new ACKGetter(); 
     Thread ackThread = new Thread(ackGet); 
     ackThread.start(); 

     //System.out.println("timeout is: " + timedOut + " base is: " + base + " packet length is: " + packets.length + " nextSeqNum: " + nextSeqNum); 

     while(base != packets.length){ 
      if(timedOut){ 
       //System.out.println("Timed out waiting for acknowledgement, resending all unACKed packets in window"); 
       clientSocket.setSoTimeout(retryTime); 
       resends++; 
       if(nextSeqNum == packets.length) 
        lastPacketRetrys++; 
       //Resend all packets in window 
       for (int i = base; i < nextSeqNum; i++){ 
       // System.out.println("Resending packets with number: " + i); 
        clientSocket.send(packets[i]); 
       } 
       timedOut = false; 
      } 

      if(nextSeqNum - base < windowSize && nextSeqNum < packets.length){ 
       //System.out.println("sending packet with seqNum: " + nextSeqNum); 
       clientSocket.send(packets[nextSeqNum]); 
       if(base == nextSeqNum){ 
        clientSocket.setSoTimeout(retryTime); 
       } 
       nextSeqNum++; 
      } 
      else{ 
       //Thread.yield(); 
      } 

     } 




     if(lastPacketRetrys > 10){ 
      System.out.println("Last packet ACK was lost (we think). So we just gave up, number of retransmissions will probably be higher"); 
     } 
     endTime = System.currentTimeMillis(); 
     throughput = 1000 * fileSizeKb/(endTime - startTime); 
     clientSocket.close(); 
     imReader.close(); 
     System.out.println("Number of retransmissions: " + resends); 
     System.out.println("Average throughput is: " + throughput + "Kb/s"); 

    } 


    private class ACKGetter implements Runnable { 
     //Listen out for ACKs and update pointers accordingly 
     DatagramPacket ackPacket; 
     byte[] ackData = new byte[2]; 
     public void run() { 
      while(base != packets.length){ 
       if(base != nextSeqNum){ 
        try{ 
         ackPacket = new DatagramPacket(ackData, ackData.length); 
         clientSocket.receive(ackPacket); 
         ackData = ackPacket.getData(); 
         bb.put(ackData[0]); 
         bb.put(ackData[1]); 
         bb.flip(); 
         short ack = bb.getShort(); 
         bb.clear(); 
         if(base <= ack){ 
          //System.out.println("acknowledgement for base num: " + base + "ack num:" + ack); 
          base = (short) (ack + 1); 
          //If theres nothing left in window, stop timing, otherwise restart the timer 
          if(base == nextSeqNum){ 
           clientSocket.setSoTimeout(0); 
          } 
          else{ 
           clientSocket.setSoTimeout(retryTime); 
          } 
         } 
         else{ 
          //System.out.println("ACK didnt change anything: " + ack); 
         } 
        } 
        catch(Exception ex){ 
         timedOut = true; 
         //System.out.println("Packet timed out...resending.."); 
        } 
       } 

       Thread.yield(); 


      } 
     } 
    } 
} 
+0

申し訳ありませんが、何らかの理由で、私はランダムなThread.yield()ステートメントに、セミコロンを付けずにgoメソッドの先頭近くに貼り付けたようです。明らかにこれは間違いであり、そこにあるべきではありません –

答えて

5

送信者がclientSocket.setSoTimeout()に電話をかけている間に、リーダースレッドがclientSocket.receive()にあるため、ここでデッドロックが発生していると思います。あなたがパケットを待って、その後0 receiveハングのソケットタイムアウトで受信している場合

public synchronized void setSoTimeout(int timeout) throws SocketException { 
... 
public synchronized void receive(DatagramPacket p) throws IOException { 

:以下DatagramSocketメソッドの定義を参照してください。 SIGQUITを発行した場合、JVMはスレッドをダンプし、デッドロックを表示し、スタックフレームを追跡して送信者と受信者がスタックしている場所を確認することができます。

これを修正するには、私にとって非常に悪い練習のように聞こえるsetSoTimeoutの値を変更しないでください。 DatagramChannelを使用して、ソケットを非ブロックにし、NIO受信を使用して読み取りを行います。チャネルSelectorの使用方法の詳細については、NIO docsを参照してください。

+0

ありがとう、これは解決策のように思えます。しかし、私はsetSoTimeoutを使用することができない場合、どのように私はこの機能を達成することができますか?あなたが特定の時間枠内でACKを受け取っておらず、現在のウィンドウを再送するなら、何かが必要です。 –

+0

'Channel'をNIOの' Selector'で 'register'してから' selector.select (millis) 'は、チャンネルが読み込める場合は1を返し、そうでない場合は0を返します。 – Gray

+0

NIOのドキュメント@CraigInnesです。 http://docs.oracle.com/javase/1.4.2/docs/guide/nio/ – Gray

関連する問題