私はこれを次のように解決しました:
最初に、異なるスレッドによって共有される要求のリストを管理するシングルトンクラスを作成しました。
public class SharedWaitingThreads {
private ArrayList<ResponseToWait> queue;
private static SharedWaitingThreads mySharedWaitingThreads;
private SharedWaitingThreads() {
queue = new ArrayList<>();
}
public static SharedWaitingThreads getInstance() {
if(mySharedWaitingThreads == null)
mySharedWaitingThreads = new SharedWaitingThreads();
return mySharedWaitingThreads;
}
public ArrayList<ResponseToWait> getQueue() {
return queue;
}
public void setQueue(ArrayList<ResponseToWait> queue) {
this.queue = queue;
}
public void waitForAnswer(ResponseToWait r) throws InterruptedException {
System.out.println("Petición registrada " + r.toString());
synchronized(mySharedWaitingThreads) {
mySharedWaitingThreads.getQueue().add(r);
while(mySharedWaitingThreads.getQueue().contains(r)) {
mySharedWaitingThreads.wait();
}
}
}
public ResponseToWait answerWaitingThread(ResponseToWait r, boolean compareSeqNum) {
ResponseToWait rw = null;
synchronized(mySharedWaitingThreads) {
for(ResponseToWait rwAux : mySharedWaitingThreads.getQueue()) {
if(rwAux.equals(r)) {
rw = rwAux;
mySharedWaitingThreads.getQueue().remove(rwAux);
//every time a thread is released, notify to release the lock
mySharedWaitingThreads.notifyAll();
break;
}
}
}
return rw;
}
}
このシングルトンインスタンスは(contextInitialized
による)、メインスレッドによって起動され、その作業を継続するために応答を待つために必要なすべてのスレッドで共有されます。 ResponseToWait
には、各要求/スレッドのすべての必要な情報が含まれています。 equals
方法は、(私の場合、私はIPと要求の種類によって比較)
public class ExamExecution extends Thread {
private SharedWaitingThreads waitingThreads;
private static volatile Thread myThread;
public ExamExecution(SharedWaitingThreads waitingThreads) {
this.waitingThreads = waitingThreads;
}
@Override
public void start() {
myThread = new Thread(this);
myThread.start();
}
@Override
public void run() {
Thread thisThread = Thread.currentThread();
ProtocolStatus p = new ProtocolStatus();
UDPClient client = new UDPClient();
if(status.getWorkingMode() == WorkingMode.CONTINUE_EXECUTION) {
byte[] frameRestart = p.createResponseFrame(status.getWorkingMode());
client.send(getIp(), getPort(), frameRestart);
//send the frame and block the thread until the server gets the proper response
try {
waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frameRestart));
} catch (InterruptedException ex) {
Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
}
}else
if(status.getForce() == ForceStatus.START) {
//get data from database and initialize variables
.
.
.
while(!executedTest.contains(testInExam.getTestId()) && myThread != null) {
int attempts = 0;
res = false;
seqNumber = this.seqNumber.getValue();
while(!res && (attempts < 3)) {
TestMemoryMap map = new TestMemoryMap(testInExam.getTestId());
byte[] frameConfig = pc.createConfigFrame(Utils.ID_RTU, (byte)1, (byte)0,
Utils.MEM_MAP_VERSION, (byte)0, map.getMemoryMap().length, seqNumber, map.getMemoryMap());
res = client.send(getIp(), getPort(), frameConfig);
if(res) {
try {
System.out.println(Thread.currentThread().getName() + " blocked waiting config answer");
waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_CONFIG, frameConfig));
} catch (InterruptedException ex) {
Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
}
}
attempts++;
}
System.out.println("Config frame received:" + res);
if(res) {
byte[] frame = p.createResponseFrame(status.getWorkingMode());
client.send(getIp(), getPort(), frame);
try {
System.out.println(Thread.currentThread().getName() + " blocked waiting end execution answer");
waitingThreads.waitForAnswer(new ResponseToWait(getIp(), getPort(), Utils.TYPE_STATUS, frame));
} catch (InterruptedException ex) {
Logger.getLogger(ExamExecution.class.getName()).log(Level.SEVERE, null, ex);
}
}
//add test to list of executed tests
executedTest.add(testInExam.getTestId());
nextTest = testInExam.getNextTestInExamId();
if(nextTest != 0) {
testInExam = daot.getEntity(nextTest);
testId = testInExam.getTestId();
}
}
} else if(status.getForce() == ForceStatus.END) {
System.out.println("Stopping...");
//abort the execution of the thread
this.endExecution();
}
}
private void endExecution() {
synchronized(myThread) {
this.myThread = null;
}
}
}
UDPサーバ・スレッドは具体的には、スレッドを待って、受信した応答に応じて、答える必要があります必要な機能に適応するためにオーバーライドされます。私はそれがよりシンプルかつ教訓にしようとするコードを単純化
public class UDPServer extends Thread {
private SocketUDPCommunication comm;
private UDPClient udpClient;
private SharedWaitingThreads waitingThreads;
public UDPServer(SharedWaitingThreads waitingThreads) {
comm = new SocketUDPCommunication();
udpClient = new UDPClient();
this.waitingThreads = waitingThreads;
}
@Override
public void run() {
DatagramPacket response;
try {
comm.setPort(Utils.UDP_SERVER_PORT);
comm.createSocket();
while (!Thread.currentThread().isInterrupted()) {
System.out.println("Waiting for clients to connect on port:" + comm.getSocket().getLocalPort());
try {
response = comm.receiveResponse();
} catch (SocketTimeoutException e) {
continue;
}
InetAddress ip = response.getAddress();
int port = response.getPort();
byte[] byteSend = comm.discardOffset(response);
byte[] header = new byte[Utils.STD_HEADER_SIZE];
Utils.getCleanHeader(byteSend, header);
byte type = header[12];
ResponseToWait r1;
if(type == Utils.TYPE_CONFIG_REPORT) {
ProtocolConfig pc = new ProtocolConfig();
pc.parseFrame(byteSend);
int mapType = pc.getPayload()[0];
int idElement = pc.getPayload()[1];
r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_CONFIG, null);
if(checkPendingRequests(r1, null)) {
System.out.println("Resending config");
continue;
}
waitingThreads.answerWaitingThread(r1, true);
}else if(type == Utils.TYPE_STATUS_REPORT) {
ProtocolStatus protocol = new ProtocolStatus();
r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS);
if(checkPendingRequests(r1, statusTest)) continue;
byte[] frame;
if(statusTest.equals(StatusTest.FINALIZED)) {
System.out.println("Test finalized. Waking threads");
r1 = new ResponseToWait(ip.getHostAddress(), port, Utils.TYPE_STATUS, null);
//Free possible waiting threads
ResponseToWait res1 = waitingThreads.answerWaitingThread(r1, false);
}
}
}
} catch (IOException e) {
System.err.println("Unable to process client request: " + e.getMessage());
} catch (IllegalArgumentException ex) {
System.err.println("Illegal Argument: " + ex.getMessage());
} catch (InterruptedException ex) {
Logger.getLogger(UDPServer.class.getName()).log(Level.SEVERE, null, ex);
} finally {
comm.closeConnection();
}
}
private boolean checkPendingRequests(ResponseToWait rw, StatusTest status) {
boolean resend = false;
System.out.println("Status: " + status);
synchronized(waitingThreads) {
for(ResponseToWait r : waitingThreads.getQueue()) {
if(r.getResponseType() == Utils.TYPE_CONFIG && r.getIp().equals(rw.getIp())) {
udpClient.send(r.getIp(), r.getPort(), r.getFrame());
resend = true;
}
if(r.getResponseType() == Utils.TYPE_STATUS && r.getIp().equals(rw.getIp())){
udpClient.send(r.getIp(), r.getPort(), r.getFrame());
resend = true;
}
}
}
return resend;
}
@Override
public void interrupt() {
super.interrupt();
comm.closeConnection();
}
}
通知は、実際のケースは
http://stackoverflow.com/questions/43289395/how-to-notifyかなり複雑です-a-specific-thread-in-java? – Nathan
[このリンク](http://stackoverflow.com/questions/289434/how-to-make-a-java-thread-wait-for-another-threads-output)をご覧ください。 Answer – Nathan
[Javaスレッドが別のスレッドの出力を待つようにする方法は?](http://stackoverflow.com/questions/289434/how-to-make-a-java-thread-wait-for-another-スレッド出力) – Nathan