2011-02-11 4 views
2

のあまりに停止...1つのスレッド私は、次のコードは、下品に見えるかもしれないという事実を認識していますが、私はこれらの事に新しいですし、ちょうどそれが仕事を得るためにすべてを試み、早期にかかわらず、CyclicBarrierを

問題:私は(間違った方法で)CyclicBarrierを使用していても、常に同じであるように見えますが、スレッドがすぐに停止してベクターを印刷し、11のうち1つを「着信接続」メッセージがないままにします。おそらく私のループの最後の反復で何かがひどく間違っていますが、私は正確に何かを見つけることができません..今、プログラムは最後の接続を処理するために待機します。

public class VectorClockClient implements Runnable { 
/* 
* Attributes 
*/ 

/* 
* The client number is store to provide fast 
* array access when, for example, a thread's own 
* clock simply needs to be incremented. 
*/ 
private int clientNumber; 
private File configFile, inputFile; 
int[] vectorClock; 

/* 
* Constructor 
* @param 
* - File config 
* - int line 
* - File input 
* - int clients 
*/ 
public VectorClockClient(File config, int line, File input, int clients) { 
    /* 
    * Make sure that File handles aren't null and that 
    * the line number is valid. 
    */ 
    if (config != null && line >= 0 && input != null) { 
     configFile = config; 
     inputFile = input; 
     clientNumber = line; 
     /* 
     * Set the array size to the number of lines found in the 
     * config file and initialize with zero values. 
     */ 
     vectorClock = new int[clients]; 
     for (int i = 0; i < vectorClock.length; i++) { 
      vectorClock[i] = 0; 
     } 
    } 
} 

private int parsePort() { 
    int returnable = 0; 
    try { 
     FileInputStream fstream = new FileInputStream(configFile.getName()); 
     DataInputStream in = new DataInputStream(fstream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
     String strLine = ""; 
     for (int i = 0; i < clientNumber + 1; i++) { 
      strLine = br.readLine(); 
     } 
     String[] tokens = strLine.split(" "); 
     returnable = Integer.parseInt(tokens[1]); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    System.out.println("[" + clientNumber + "] returned with " + returnable + "."); 
    return returnable; 
} 

private int parsePort(int client) { 
    int returnable = 0; 
    try { 
     FileInputStream fstream = new FileInputStream(configFile.getName()); 
     DataInputStream in = new DataInputStream(fstream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
     String strLine = ""; 
     for (int i = 0; i < client; i++) { 
      strLine = br.readLine(); 
     } 
     String[] tokens = strLine.split(" "); 
     returnable = Integer.parseInt(tokens[1]); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return returnable; 
} 

private int parseAction(String s) { 
    int returnable = -1; 
    try { 
     FileInputStream fstream = new FileInputStream(configFile.getName()); 
     DataInputStream in = new DataInputStream(fstream); 
     BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
     String[] tokens = s.split(" "); 
     if (!(Integer.parseInt(tokens[0]) == this.clientNumber + 1)) { 
      return -1; 
     } 
     else { 
      if (tokens[1].equals("L")) { 
       vectorClock[clientNumber] += Integer.parseInt(tokens[2]); 
      } 
      else { 
       returnable = Integer.parseInt(tokens[2]); 
      } 
     } 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    return returnable; 
} 

/* 
* Do the actual work. 
*/ 
public void run() { 
    try { 
     InitClients.barrier.await(); 
    } 
    catch (Exception e) { 
     System.out.println(e); 
    } 
    int port = parsePort(); 
    String hostname = "localhost"; 
    String strLine; 
    ServerSocketChannel ssc; 
    SocketChannel sc; 
    FileInputStream fstream; 
    DataInputStream in; 
    BufferedReader br; 
    boolean eof = false; 
    try { 
     ssc = ServerSocketChannel.open(); 
     ssc.socket().bind(new InetSocketAddress(hostname, port)); 
     ssc.configureBlocking(false); 
     fstream = new FileInputStream("input_vector.txt"); 
     in = new DataInputStream(fstream); 
     br = new BufferedReader(new InputStreamReader(in)); 

     try { 
      InitClients.barrier.await(); 
     } 
     catch (Exception e) { 
      System.out.println(e); 
     } 

     while (true && (eof == false)) { 
      sc = ssc.accept(); 

      if (sc == null) { 
       if ((strLine = br.readLine()) != null) { 
        int result = parseAction(strLine); 
        if (result >= 0) { 
         //System.out.println("[" + (clientNumber + 1) 
         //+ "] Send a message to " + result + "."); 
         try { 
          SocketChannel client = SocketChannel.open(); 
          client.configureBlocking(true); 
          client.connect(
            new InetSocketAddress("localhost", 
            parsePort(result))); 
          //ByteBuffer buf = ByteBuffer.allocateDirect(32); 
          //buf.put((byte)0xFF); 
          //buf.flip(); 
          //vectorClock[clientNumber] += 1; 
          //int numBytesWritten = client.write(buf); 
          String obj = Integer.toString(clientNumber+1); 
          ObjectOutputStream oos = new 
            ObjectOutputStream(
            client.socket().getOutputStream()); 
          oos.writeObject(obj); 
          oos.close(); 
         } 
         catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
       else { 
        eof = true; 
       } 
      } 
      else { 
       ObjectInputStream ois = new 
         ObjectInputStream(sc.socket().getInputStream()); 
       String clientNumberString = (String)ois.readObject(); 
       System.out.println("At {Client[" + (clientNumber + 1) 
         + "]}Incoming connection from: " 
         + sc.socket().getRemoteSocketAddress() 
         + " from {Client[" + clientNumberString + "]}"); 
       sc.close(); 
      } 
      try { 
       InitClients.barrier.await(); 
      } 
      catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
    printVector(); 
} 

private void printVector() { 
    System.out.print("{Client[" + (clientNumber + 1) + "]}{"); 
    for (int i = 0; i < vectorClock.length; i++) { 
     System.out.print(vectorClock[i] + "\t"); 
    } 
    System.out.println("}"); 
} 

}
ここでは、明確にするために使用されるファイルのフォーマットがあります。 Configにはクライアントが使用するホスト名とポートが含まれ、スレッドは入力ファイルの行で、「このクライアントはそのクライアントにメッセージを送信する」または「このクライアントは論理時計を一定の値で増分する」のいずれかを意味します。

1 M 2(Mは、メッセージを送信する手段)
2 M 3
3 M 4
2 L 7(Lがインクリメントクロックを意味する)
2 M 1
...
127.0.0.1 9000
127.0.0.1 9001
127.0.0.1 9002
127.0.0.1 9003
。 ..

答えて

0

着信ソケット接続を期待しているときのロジックを見ていきます。あなたの質問から、あなたが着信ソケット接続(潜在的にすべての発信メッセージの後に接続が入っていますか?受信ソケットで非ブロック入出力を使用しているので、受信ソケットが確立される前にwhile文がループする可能性が常にあります。その結果、スレッドは接続を継続することなくファイルから次の行を読み込み続けることができます。ファイルの終わりに達すると終了状態に達するので、着信ソケット接続を逃す可能性があります。

ファイルから読み取ったとき、メッセージを送信したとき、および着信接続を受信したときに表示される簡単なプリントアウトを追加します。それは、特定のスレッドが予想される着信接続を欠いているかどうかをすぐに知らせるはずです。問題がノンブロッキングI/Oによるものであることが判明した場合は、着信ソケットが必要な場合や、期待している着信ソケット数を把握しているコントロールを実装している場合は、非ブロッキングI/Oを無効にする必要がありますその目標が達成されるまで続きます。

これが役に立ちます。

+0

特定のスレッドが接続要求を受け取っていない可能性があります。入力ファイルは、誰が何をすべきかを決定するために使用されます。 現時点では、非ブロッキングクライアントを有効にすると、プログラムは接続されていないソケットに関するエラーを生成します。また、非ブロッキングサーバーを無効にすると、各スレッドが接続を待っていて、入力ファイルを読み取っているユーザーがいないため、プログラムはハングします。私もセレクタを実装しようとしましたが、問題はselect()呼び出しでループがブロックされてしまうことでした。 – treiman

+0

1つのオプションは、メッセージの受信を期待するタイミングに応じて、configureBlockingメソッドを使用してブロックI/Oをプログラムで有効/無効にすることです。最初はソケットを非ブロックに設定します。着信応答が必要な場合は、接続が受信されるまでブロッキングに変更します。受信した後、再び必要になるまでノンブロッキングに戻すことができます。代わりに、このスイッチは、whileループの各繰り返しで接続を実行するのではなく、接続を待つときだけ接続をリスンすることです。 –

関連する問題

 関連する問題