2016-07-09 18 views
1

以下のコードは動作し、予定時刻にメッセージを送信しますが、タイマーがスケジュールされたタスクを実行するたびに新しいソケットを開くのは良い解決策ではないと思います。私が望むのは、runメソッドでsocketを一度だけ開いて、クラスの新しいインスタンスがtimerで作成されるたびにSendMessageクラスにアクセスすることです。そのように動作しない場合は、1つのメッセージしか送信せずに送信を停止します。また、スレッドセーフなコードやヒントについては、いくつかの批評家にはうれしいです。ソケットは一度だけメッセージを送信します

public class Client implements Runnable{ 

// Client Constructor here 

@Override 
public void run(){ 
    //SENDS ONLY ONE MESSAGE 
    pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

    Timer timer = new Timer(); 

    timer.schedule(new SendMessage(), 0, 1000/mps); 
} 

private class SendMessage extends TimerTask{ 

    private int id; 

    @Override 
    public void run() { 

     try 
      { // THIS WORKS FINE, SENDS MESSAGES AT SCHEDULED TIME      
      pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

      OutputStream outToServer = pitcherSocket.getOutputStream(); 

      DataOutputStream out = new DataOutputStream(outToServer); 

      out.writeInt(id); 

      out.flush(); 

      }catch(IOException e) 
      { 
      e.printStackTrace(); 
      } 
     } 
    } 
} 

EDIT:WHOLEのCODE

CLIENT

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.InetAddress; 
import java.net.Socket; 
import java.net.UnknownHostException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Timer; 
import java.util.TimerTask; 

public class Pitcher implements Runnable{ 

private int port; 
private int mps; 
private int size; 
private String hostname; 
private List<Integer> messageIds = Collections.synchronizedList(new  ArrayList<Integer>()); 
private Socket pitcherSocket; 

//constatns, integer is 4 bytes, long is 8 bytes 
private static final int INT_SIZE = 4; 
private static final int LONG_SIZE = 8; 


public Pitcher(int port, int mps, int size, String hostname) { 

    this.port = port; 
    this.mps = mps; 
    this.size = size; 
    this.hostname = hostname; 
} 

@Override 
public void run(){ 

    System.out.println("Pitcher running..."); 
    System.out.println(); 

    Timer timer = new Timer(); 

    timer.schedule(new SendMessage(), 0, 1000/mps); 

    timer.schedule(new DisplayStatistics(), 0, 1000/mps); 

} 

//Nested class that sends messages 
private class SendMessage extends TimerTask{ 

    private int numberOfSentMessages = 0; 
    private int id; 

    @Override 
    public void run() { 

     try {       
      pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 

      OutputStream outToServer = pitcherSocket.getOutputStream(); 

      DataOutputStream out = new DataOutputStream(outToServer); 

      //send message size 
      out.writeInt(size); 

      //message id is same as number of the sent message 
      id = numberOfSentMessages + 1; 
      out.writeInt(id); 
      messageIds.add(id); 



      //get system timestamp 
      long currentTimestamp = System.currentTimeMillis(); 
      out.writeLong(currentTimestamp); 

      //fill in the rest- 
      byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];  //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes) 
      out.write(rest); 

      out.flush(); 

      numberOfSentMessages++; 


      InputStream inFromServer = pitcherSocket.getInputStream(); 
      DataInputStream in = new DataInputStream(inFromServer); 

      Integer catcherMessageSize = in.readInt(); 
      Integer catcherId = in.readInt(); 
      long catcherTimestamp = in.readLong(); 

      System.out.println("Sent message:  " + size + " " + id + " " + currentTimestamp + "..."); 
      System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "..."); 
      System.out.println(); 

      }catch(IOException e) 
      { 
      e.printStackTrace(); 
      } 

    } 

} 

} 

サーバ

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.net.SocketTimeoutException; 

public class Catcher implements Runnable{ 

private int port; 
private String bind; 
private ServerSocket serverSocket; 

//constatns, integer is 4 bytes, long is 8 bytes 
private static final int INT_SIZE = 4; 
private static final int LONG_SIZE = 8; 

public Catcher(int port, String bind) { 

    this.port = port; 
    this.bind = bind; 
} 

@Override 
public void run() { 

    System.out.println("Catcher running..."); 
    System.out.println(); 

    try { 
     serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind)); 
    } 
    catch (IOException e1) { 
     e1.printStackTrace(); 
    } 

    while(true){ 

     try 
     {    
      Socket server = serverSocket.accept(); 

      DataInputStream in = new DataInputStream(server.getInputStream()); 

      Integer pitcherMessageSize = in.readInt(); 
      Integer pitcherId = in.readInt(); 
      long pitcherTimestamp = in.readLong(); 

      DataOutputStream out = new DataOutputStream(server.getOutputStream()); 

      //message id and size are sent back 
      out.writeInt(pitcherMessageSize); 
      out.writeInt(pitcherId); 

      //send back current time 
      long currentTimestamp = System.currentTimeMillis(); 
      out.writeLong(currentTimestamp); 

      //fill in the rest 
      byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes) 
      out.write(rest); 

      out.flush(); 

      System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "..."); 
      System.out.println("Sent message:  " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "..."); 
      System.out.println(); 

      //server.close(); 

     } 
     catch(SocketTimeoutException s){ 
      System.out.println("Socket timed out!"); 
      break; 
     } 
     catch(IOException e){ 
      e.printStackTrace(); 
      break; 
     } 
     } 
} 
} 
+0

例外はありますか?それが送信を停止するとき、プログラムは終了するか、それともちょうどぶら下がっていますか? – niceman

+0

例外はありません。サーバーは最初のIDを受け取り、引き続きリスンします。クライアントのタイマーはまだSendMessageクラスを実行して送信しようとしますが、最初の繰り返しの後にnothindが送信されます – asdf

+0

私が見るところでは、あなたはスケジュールに基づいて 'SendMessage'を作成して実行しています。 'SendMessage'は、作成されるたびに' Socket'を介して再接続を試みます。 –

答えて

-2

JavaのSocketクラスはスレッドセーフではありません。複数のスレッドが同じSocketオブジェクトにアクセスするようにするには、アクションを同期させる必要があります。これは、すべてのSendMessageスレッドに、ロックとして機能する共通オブジェクトを提供することによって実行できます。使用する予定のソケット操作ごとにオブジェクトが必要です(例:読み書き)。次に、Socketオブジェクトを別々のメソッドに呼び出し、そのオブジェクトの周りでそれらを同期させるすべてのアクションをリファクタリングします。例えば。 read操作のために、Socket.readを呼び出してこのメ​​ソッドをロックオブジェクトの周りに同期させるSendMessageの中にread()というメソッドを持たせることができます。

private class SendMessage extends TimerTask{ 

    private Object readLock; 
    private Socket socket; 

    public SendMessage(Object readLock, Socket socket) { 
     this.readLock = readLock; 
     this.socket = socket; 
    } 

    public void readFromSocket() { 
     synchronized(readLock) { 
       socket.read(); 
     } 
    } 

    @Override 
    public void run() { 
     readFromSocket(); 
     // do other stuff 
    } 

} 
+0

これは彼の問題の原因を説明していません。 –

+0

問題は、複数のスレッドで同じソケットオブジェクトを使用する場合にのみ発生します。したがって、ソケットがスレッドセーフではないために発生している可能性があります。それが問題の原因だと思わない理由を詳しく説明してください。 – Soggiorno

+0

関連:http://stackoverflow.com/questions/13545578/is-java-socket-multi-thread-safe – Soggiorno

0

ソケットとSendMessageのDataOutputStreamメンバー変数の両方を作成することについて考えましたか。これはあなたに大まかなスタートを与えるためのコードです。あなたは、おそらく私がいることを考えるコード全体を見ることができるされた後

private class SendMessage extends TimerTask { 
    private int id = 10; 
    private Socket pitchSocket; 
    private DataOutputStream out; 

    public SendMessage(Socket socket) { 
     this.pitchSocket = socket; 
     try{ 
      out = new DataOutputStream(pitchSocket.getOutputStream()); 
     } catch(IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void run() { 
     try { 
      out.writeInt(id); 
      out.flush(); 
     } catch(IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

まだ同じ動作をしています:/ – asdf

+0

オリジナルの投稿であると誤解している可能性があります。複数のSendMessengerを作成していますか? – CodeJockNYC

0

...ソケットが開いているかどうかをチェックし、現在の1が閉じている場合は、新しいものを作成することができるというようないくつかの機能強化を置くことになるでしょう私はクライアント側ではなくサーバー側でより重視されていると思うが、あなたは間違いなくスレッドの問題を抱えている。あなたのサーバーはシングルスレッドです。つまり、一度に1つのリクエストしか処理できません。マルチスレッドサーバーが必要です。私はあなたのコードをリファクタリングして、マルチスレッドのCatcherの例を作成しました。私はTheadクラスを使用して、このすべてをやっています。これは少し古いかもしれません。 java.util.concurrentを見てみるといいかもしれませんが、おそらく最新のものがあります。

package clientserver; 

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.net.InetAddress; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.net.SocketTimeoutException; 

public class Catcher implements Runnable{ 

private int port; 
private String bind; 
private ServerSocket serverSocket; 



public Catcher(int port, String bind) { 

    this.port = port; 
    this.bind = bind; 
} 

@Override 
public void run() { 

    System.out.println("Catcher running..."); 
    System.out.println(); 

    try { 
     serverSocket = new ServerSocket(port, 100, InetAddress.getByName(bind)); 
    } 
    catch (IOException e1) { 
     e1.printStackTrace(); 
    } 

    while(true){ 
     try 
     {    
      new Thread(new CatcherHandler(serverSocket.accept())).start(); 
      Thread.sleep(1000); 

     } 
     catch(SocketTimeoutException s){ 
      System.out.println("Socket timed out!"); 
      break; 
     } 
     catch(IOException e){ 
      e.printStackTrace(); 
      break; 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

public static void main(String[] argv){ 
    new Thread(new Catcher(8093, "localhost")).start();; 

} 
} 

class CatcherHandler implements Runnable{ 
    Socket server; 
    DataOutputStream out; 
    DataInputStream in; 

    private static final int INT_SIZE = 4; 
    private static final int LONG_SIZE = 8; 

    public CatcherHandler(Socket server) { 
     super(); 
     this.server = server; 
     try { 
      in = new DataInputStream(server.getInputStream()); 
      out = new DataOutputStream(server.getOutputStream()); 

     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 



    @Override 
    public void run() { 
     try{ 
      if(in.available() > 0){ 

       Integer pitcherMessageSize = in.readInt(); 
       Integer pitcherId = in.readInt(); 
       long pitcherTimestamp = in.readLong(); 

       //message id and size are sent back 
       out.writeInt(pitcherMessageSize); 
       out.writeInt(pitcherId); 

       //send back current time 
       long currentTimestamp = System.currentTimeMillis(); 
       out.writeLong(currentTimestamp); 

       //fill in the rest 
       byte[] rest = new byte[pitcherMessageSize - 2 * INT_SIZE - LONG_SIZE]; //message size(default 300 bytes) - size(4 bytes) - message id(4 bytes) - timestamp(8 bytes) 
       out.write(rest); 

       out.flush(); 

       System.out.println("Received message: " + pitcherMessageSize + " " + pitcherId + " " + pitcherTimestamp + "..."); 
       System.out.println("Sent message:  " + pitcherMessageSize + " " + pitcherId + " " + currentTimestamp + "..."); 
       System.out.println(); 
       Thread.sleep(1000); 

      } 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     }finally{} 
     //server.close(); 

    } 
} 

さらに、私はあなたのクライアントを1つのソケットを使用して安全に動くようにリファクタリングしました。今SendMessageは、引数としてDataInputStreamとDataOutputSteamを受け取ります。

import java.io.DataInputStream; 
import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.InetAddress; 
import java.net.Socket; 
import java.net.UnknownHostException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Timer; 
import java.util.TimerTask; 

public class Pitcher implements Runnable{ 

private int port; 
private int mps; 
private int size; 
private String hostname; 
private List<Integer> messageIds = Collections.synchronizedList(new  ArrayList<Integer>()); 
private Socket pitcherSocket; 
private DataOutputStream out; 
private DataInputStream in; 

//constatns, integer is 4 bytes, long is 8 bytes 
private static final int INT_SIZE = 4; 
private static final int LONG_SIZE = 8; 


public Pitcher(int port, int mps, int size, String hostname) { 

    this.port = port; 
    this.mps = mps; 
    this.size = size; 
    this.hostname = hostname; 



    try { 
     this.pitcherSocket = new Socket(InetAddress.getByName(hostname), port); 
     out = new DataOutputStream(pitcherSocket.getOutputStream()); 
     in = new DataInputStream(pitcherSocket.getInputStream()); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 


} 

public static void main(String[] argv) throws Exception{ 
    for(int i = 0; i < 10; i++){ 
     new Thread(new Pitcher(8093, 1, 200, "localhost")).start(); 
     Thread.sleep(1000); 
    } 

    Thread.sleep(10000); 
} 

@Override 
public void run(){ 

    System.out.println("Pitcher running..."); 
    System.out.println(); 

    Timer timer = new Timer(); 

    timer.schedule(new SendMessage(out, in), 0, 1000); 

    //timer.schedule(new DisplayStatistics(), 0, 1000); 

} 

//Nested class that sends messages 
private class SendMessage extends TimerTask{ 

    private int numberOfSentMessages = 0; 
    private int id; 
    private DataOutputStream out; 
    private DataInputStream in; 

    public SendMessage(DataOutputStream out, DataInputStream in){ 
     this.out = out; 
     this.in = in; 
    } 

    @Override 
    public void run() { 

     try {       
      long currentTimestamp = 0L; 
      synchronized(out){ 
       //send message size 
       out.writeInt(size); 

       //message id is same as number of the sent message 
       id = numberOfSentMessages + 1; 
       out.writeInt(id); 
       messageIds.add(id); 



       //get system timestamp 
       currentTimestamp = System.currentTimeMillis(); 
       out.writeLong(currentTimestamp); 

       //fill in the rest- 
       byte[] rest = new byte[size - 2 * INT_SIZE - LONG_SIZE];  //message size(default 300 bytes) - size(4 bytes) - message id(4 bytse) - timestamp(8 bytes) 
       out.write(rest); 

       out.flush(); 
      } 
      numberOfSentMessages++; 

      long catcherTimestamp = 0L; 
      Integer catcherMessageSize; 
      Integer catcherId; 
      synchronized(in){ 
       catcherMessageSize = in.readInt(); 
       catcherId = in.readInt(); 
       catcherTimestamp = in.readLong(); 
      } 
      System.out.println("Sent message:  " + size + " " + id + " " + currentTimestamp + "..."); 
      System.out.println("Received message: " + catcherMessageSize + " " + catcherId + " " + catcherTimestamp + "..."); 
      System.out.println(); 
      Thread.sleep(1000); 

      }catch(IOException e) 
      { 
      e.printStackTrace(); 
      } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

} 

} 
+0

ありがとうございます!私は明日このコードを実装しようとし、すべてがうまくいるかどうかを知らせます。なぜあなたはthread.sleep(X)を使っているのですか?私はスレッドに新しいです。 – asdf

+0

ええ、私は他のスレッドを実行できるようにスリープを使用しています。基本的に、スリープメソッドを呼び出すと、スケジューラは他のスレッドを実行できるようになります。 – CodeJockNYC

+0

別の問題があります。投手がメッセージを送信すると、キャッチャーはSystem.currentTimeMillis();投手よりも小さい。 – asdf

関連する問題