2017-05-15 10 views
0

クライアントからRESTful要求を受け取り、特定のデバイスに新しいスレッド(UDPパケット)で送信するサーバーアプリケーションを開発しています。さらに、実行の開始時にサーブレットリスナによって起動された別のスレッドを実行します。このスレッドは、システムのすべてのデバイスから送信されたUDPパケットをリッスンします。マルチスレッドアプリケーションの特定のスレッドに通知する方法

クライアントが特定のデバイスから要求を行う場合、RESTサービスは、UDPパケットがデバイスに送信されるスレッドを起動し、応答を待機する必要があります。 UDPサーバが最終的にそのデバイスからパケットを受信すると(パケットからipをチェックする)、ブロックされたスレッドにその実行を継続して終了する必要があることを通知する必要があります。

私はwait()notify()notifyAll()メソッドを使用して考えた、しかし、多くのスレッドが複数のデバイスの応答を待ってブロックすることができるように、私は私が唯一の希望のスレッド(1 ブロック解除に知らせることができるかが表示されません応答デバイス上で要求を行った)。その方法を使ってこれを行う方法はありますか?他のアプローチ?

SocketServletListener:

public class SocketServletListener implements ServletContextListener { 

    private UDPServer server; 
    private ServletContext context; 

    @Override 
    public void contextInitialized(ServletContextEvent sce) { 
     context = sce.getServletContext(); 
     server = new UDPServer(); 
     server.start(); 
    } 

    @Override 
    public void contextDestroyed(ServletContextEvent sce) { 
     context = sce.getServletContext(); 
     server.interrupt(); 
    } 

} 

UDPServer:

public class UDPServer extends Thread { 

    private SocketUDPCommunication comm; 


    public UDPServer() { 
     comm = new SocketUDPCommunication(); 
    } 

    @Override 
    public void run() { 

     DatagramPacket response; 
     try { 
      comm.setPort(Utils.UDP_SERVER_PORT); 
      comm.createSocket(); 

      while (!Thread.currentThread().isInterrupted()) { 
       try { 
        response = comm.receiveResponse(); 
       } catch (SocketTimeoutException e) { 
        continue; 
       }       
       InetAddress ip = response.getAddress(); 
       int port = response.getPort(); 

       byte[] byteSend = comm.discardOffset(response); 

       //TODO notify thread which made the request for the responding device (identified by ip) 

      } 
     } catch (IOException e) { 
      System.err.println("Unable to process client request: " + e.getMessage()); 
     } catch (IllegalArgumentException ex) { 
      System.err.println("Illegal Argument: " + ex.getMessage()); 
     } finally { 
      comm.closeConnection(); 
     } 
    } 

    @Override 
    public void interrupt() { 
     super.interrupt(); 
     comm.closeConnection(); 
    } 
} 

DataSend.java:

@Path("dataSend") 
public class DataSend { 

    @Context 
    private UriInfo context; 

    public DataSend() { 
    } 

    @POST 
    @Consumes(MediaType.APPLICATION_JSON) 
    public Response postJson(ForceStatus status) { 

     new TestExecution(status).start(); 

     return Response.status(Response.Status.OK).build();  
    } 
} 

TestExecution:

public class TestExecution extends Thread { 
    private ForceStatus status; 

    public ExamExecution(ForceStatus status) { 
     this.status = status; 
    } 

    @Override 
    public void run() { 
     ProtocolStatus p = new ProtocolStatus(); 
     byte[] s = p.createResponseFrame(status.getForce()); 

     List<Integer> executedTest = new ArrayList<>(); 

     //Simple UDP client 
     UDPClient client = new UDPClient(); 
     . 
     . 
     . 
     //t is a pojo which contains the data from a battery of tests 
     while(!executedTest.contains(t.getTestId())) { 

      client.send(status.getIp(), status.getPort(), s); 
      //TODO wait until UDPServer thread gets the response from the device 

      executedTest.add(t.getTestId()); 

      nextTest = t.getNextTestId(); 

      t = getEntity(nextTest); 
     }  
    } 
} 
0ここでいくつかのコード(簡略化)であります
+0

http://stackoverflow.com/questions/43289395/how-to-notifyかなり複雑です-a-specific-thread-in-java? – Nathan

+0

[このリンク](http://stackoverflow.com/questions/289434/how-to-make-a-java-thread-wait-for-another-threads-output)をご覧ください。 Answer – Nathan

+0

[Javaスレッドが別のスレッドの出力を待つようにする方法は?](http://stackoverflow.com/questions/289434/how-to-make-a-java-thread-wait-for-another-スレッド出力) – Nathan

答えて

0

私はこれを次のように解決しました:

最初に、異なるスレッドによって共有される要求のリストを管理するシングルトンクラスを作成しました。

public class SharedWaitingThreads { 
    private ArrayList<ResponseToWait> queue; 
    private static SharedWaitingThreads mySharedWaitingThreads; 

    private SharedWaitingThreads() { 
     queue = new ArrayList<>(); 
    } 

    public static SharedWaitingThreads getInstance() { 
     if(mySharedWaitingThreads == null) 
      mySharedWaitingThreads = new SharedWaitingThreads(); 

     return mySharedWaitingThreads; 
    } 

    public ArrayList<ResponseToWait> getQueue() { 
     return queue; 
    } 

    public void setQueue(ArrayList<ResponseToWait> queue) { 
     this.queue = queue; 
    } 

    public void waitForAnswer(ResponseToWait r) throws InterruptedException { 
     System.out.println("Petición registrada " + r.toString()); 
     synchronized(mySharedWaitingThreads) { 
      mySharedWaitingThreads.getQueue().add(r); 
      while(mySharedWaitingThreads.getQueue().contains(r)) {   
       mySharedWaitingThreads.wait(); 
      } 
     } 
    } 



    public ResponseToWait answerWaitingThread(ResponseToWait r, boolean compareSeqNum) { 
     ResponseToWait rw = null; 
     synchronized(mySharedWaitingThreads) { 
      for(ResponseToWait rwAux : mySharedWaitingThreads.getQueue()) { 
       if(rwAux.equals(r)) { 
        rw = rwAux; 
        mySharedWaitingThreads.getQueue().remove(rwAux); 
        //every time a thread is released, notify to release the lock 
        mySharedWaitingThreads.notifyAll(); 
        break; 
       } 
      } 
     } 
     return rw; 
    } 
} 

このシングルトンインスタンスは(contextInitializedによる)、メインスレッドによって起動され、その作業を継続するために応答を待つために必要なすべてのスレッドで共有されます。 ResponseToWaitには、各要求/スレッドのすべての必要な情報が含まれています。 equals方法は、(私の場合、私はIPと要求の種類によって比較)

public class ExamExecution extends Thread { 

    private SharedWaitingThreads waitingThreads; 
    private static volatile Thread myThread; 

    public ExamExecution(SharedWaitingThreads waitingThreads) { 
     this.waitingThreads = waitingThreads; 
    } 

    @Override 
    public void start() { 
     myThread = new Thread(this); 
     myThread.start(); 
    } 

    @Override 
    public void run() { 
     Thread thisThread = Thread.currentThread(); 
     ProtocolStatus p = new ProtocolStatus(); 
     UDPClient client = new UDPClient(); 
     if(status.getWorkingMode() == WorkingMode.CONTINUE_EXECUTION) { 
      byte[] frameRestart = p.createResponseFrame(status.getWorkingMode()); 
      client.send(getIp(), getPort(), frameRestart); 
      //send the frame and block the thread until the server gets the proper response 
      try { 
       waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frameRestart)); 
      } catch (InterruptedException ex) { 
       Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     }else 
     if(status.getForce() == ForceStatus.START) {     
      //get data from database and initialize variables  
      . 
      . 
      . 

      while(!executedTest.contains(testInExam.getTestId()) && myThread != null) { 
       int attempts = 0; 
       res = false; 
       seqNumber = this.seqNumber.getValue(); 
       while(!res && (attempts < 3)) { 
        TestMemoryMap map = new TestMemoryMap(testInExam.getTestId()); 
        byte[] frameConfig = pc.createConfigFrame(Utils.ID_RTU, (byte)1, (byte)0, 
         Utils.MEM_MAP_VERSION, (byte)0, map.getMemoryMap().length, seqNumber, map.getMemoryMap()); 

        res = client.send(getIp(), getPort(), frameConfig); 

        if(res) { 
         try { 
          System.out.println(Thread.currentThread().getName() + " blocked waiting config answer"); 
          waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_CONFIG, frameConfig)); 
         } catch (InterruptedException ex) { 
          Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex); 
         } 
        } 
        attempts++; 
       } 
       System.out.println("Config frame received:" + res); 

       if(res) { 
        byte[] frame = p.createResponseFrame(status.getWorkingMode()); 
        client.send(getIp(), getPort(), frame); 

        try { 
         System.out.println(Thread.currentThread().getName() + " blocked waiting end execution answer"); 
         waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frame)); 
        } catch (InterruptedException ex) { 
         Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex); 
        }    
       } 
       //add test to list of executed tests 
       executedTest.add(testInExam.getTestId()); 
       nextTest = testInExam.getNextTestInExamId(); 
       if(nextTest != 0) { 
        testInExam = daot.getEntity(nextTest); 
        testId = testInExam.getTestId(); 
       } 
      } 
     } else if(status.getForce() == ForceStatus.END) { 
      System.out.println("Stopping..."); 
      //abort the execution of the thread 
      this.endExecution(); 

     } 
    } 

    private void endExecution() { 
     synchronized(myThread) { 
      this.myThread = null; 
     } 
    } 
} 

UDPサーバ・スレッドは具体的には、スレッドを待って、受信した応答に応じて、答える必要があります必要な機能に適応するためにオーバーライドされます。私はそれがよりシンプルかつ教訓にしようとするコードを単純化

public class UDPServer extends Thread { 

    private SocketUDPCommunication comm; 
    private UDPClient udpClient; 
    private SharedWaitingThreads waitingThreads; 

    public UDPServer(SharedWaitingThreads waitingThreads) { 
     comm = new SocketUDPCommunication(); 
     udpClient = new UDPClient(); 
     this.waitingThreads = waitingThreads; 
    } 


    @Override 
    public void run() { 
     DatagramPacket response; 
     try { 
      comm.setPort(Utils.UDP_SERVER_PORT); 
      comm.createSocket(); 

      while (!Thread.currentThread().isInterrupted()) { 
       System.out.println("Waiting for clients to connect on port:" + comm.getSocket().getLocalPort()); 
       try { 
        response = comm.receiveResponse(); 
       } catch (SocketTimeoutException e) { 
        continue; 
       }       
       InetAddress ip = response.getAddress(); 
       int port = response.getPort(); 

       byte[] byteSend = comm.discardOffset(response); 

       byte[] header = new byte[Utils.STD_HEADER_SIZE]; 
       Utils.getCleanHeader(byteSend, header); 
       byte type = header[12]; 

       ResponseToWait r1; 
       if(type == Utils.TYPE_CONFIG_REPORT) { 
        ProtocolConfig pc = new ProtocolConfig(); 
        pc.parseFrame(byteSend); 
        int mapType = pc.getPayload()[0]; 
        int idElement = pc.getPayload()[1]; 
        r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_CONFIG, null); 
        if(checkPendingRequests(r1, null)) { 
         System.out.println("Resending config"); 
         continue; 
        } 
        waitingThreads.answerWaitingThread(r1, true); 
       }else if(type == Utils.TYPE_STATUS_REPORT) { 
        ProtocolStatus protocol = new ProtocolStatus(); 

        r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS); 
        if(checkPendingRequests(r1, statusTest)) continue; 
        byte[] frame; 
        if(statusTest.equals(StatusTest.FINALIZED)) { 
         System.out.println("Test finalized. Waking threads"); 
         r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS, null); 
         //Free possible waiting threads 
         ResponseToWait res1 = waitingThreads.answerWaitingThread(r1, false); 

        } 
       } 
      } 
     } catch (IOException e) { 
      System.err.println("Unable to process client request: " + e.getMessage()); 
     } catch (IllegalArgumentException ex) { 
      System.err.println("Illegal Argument: " + ex.getMessage()); 
     } catch (InterruptedException ex) { 
      Logger.getLogger(UDPServer.class.getName()).log(Level.SEVERE, null, ex); 
     } finally { 
      comm.closeConnection(); 
     } 
    } 

    private boolean checkPendingRequests(ResponseToWait rw, StatusTest status) { 
     boolean resend = false; 
     System.out.println("Status: " + status); 
     synchronized(waitingThreads) { 
      for(ResponseToWait r : waitingThreads.getQueue()) { 
       if(r.getResponseType() == Utils.TYPE_CONFIG && r.getIp().equals(rw.getIp())) { 
        udpClient.send(r.getIp(), r.getPort(), r.getFrame()); 
        resend = true; 
       } 
       if(r.getResponseType() == Utils.TYPE_STATUS && r.getIp().equals(rw.getIp())){ 
        udpClient.send(r.getIp(), r.getPort(), r.getFrame()); 
        resend = true; 
       } 
      } 
     } 
     return resend; 
    } 

    @Override 
    public void interrupt() { 
     super.interrupt(); 
     comm.closeConnection(); 
    } 

} 

通知は、実際のケースは

関連する問題