2017-04-26 16 views
0

データを処理し、時には別のスレッドで実行されているリスナーインスタンスからのデータが必要なExecutorPoolでスレッドが7つ実行されています。リスナーはソケットを介してサーバーに要求を送信し、後で結果が返されると、リスナーはデータを呼び出したワーカースレッドに返します。私は、要求されたデータが返されるまでワーカースレッドをブロックしたいが、リスナーが他のワーカースレッドからの他の要求を行うのをブロックしたくない。それ、どうやったら出来るの?Javaスレッド化 - データが返されるのを待ちますが、他のスレッドをブロックしません

+0

クライアントとサーバーの接続方法を示すコードスニペットを提供できますか? – jeffkempf

+0

現在の実装の様子を示すには、コードをいくつか記述する必要があります。 – hagrawal

+0

リスナーはリクエストごとに新しいソケットを開くか、すべてのリクエストに対して1つのソケットを共有する必要がありますか? – bowmore

答えて

2

あるスレッドが別のスレッドに作業を渡した後、結果を待っているだけであれば、別のスレッドは必要ありません。あなたは仕事をするが、同じスレッドで呼び出されるクラスが必要な場合があります。また、同じインスタンスが複数のスレッドで使用されている場合は、同期が必要な場合があります。しかし、結論は次のとおりです。

リスナースレッドは不要です。要求を処理するコンポーネントに置き換えて、それを同期的に呼び出します。

編集

あなた自身の答えを考えると、あなたの問題は少し明確です。 @ JimNは、Futureをワーカースレッドに渡して、CompletableFutureにしたいと考えているので、レスポンスが返されるまで、リスナは要求IDによってキーを付けてMapに保持します。

サンプルコード:

public class WorkUnitProcessor implements Runnable { 

    // ... 

    @Override 
    public void run() { 
     while(true) { 
      WorkUnit work = master.getNextWorkUnit(); 
      if(work == null) return; 
      doWork(work); 
     } 
    } 

    public void doWork(WorkUnit work) { 

     //Do some work... 

     try { 
      DataRequest dataRequest = createRequest(work); 

      Future<Response> future = server.getData(dataRequest); 
      Response response = future.get();      // this call blocks until the Response is available. 

      //finish doing work 

     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } catch (ExecutionException e) { 
      // handle e.getCause() 
     } 

    } 

    // ... 
} 

public class Server implements DataSourceDrivenCallback { 

    private final DataSource dataSource; 

    private Map<Integer, CompletableFuture<Response>> openRequests = new ConcurrentHashMap<>(); 

    public Server(DataSource dataSource) { 
     this.dataSource = dataSource; 
    } 

    @Override 
    public void incomingDataCallback(int requestId, ChunkOfData requestedData) { 
     CompletableFuture<Response> responseHolder = openRequests.remove(requestId); // get the responseHolder 
     if (responseHolder != null) { 
      responseHolder.complete(toResponse(requestedData));      // make the response available. 
     } 
    } 

    public Future<Response> getData(DataRequest datarequest) { 
     int requestId = dataSource.submitRequest(serializeAndTranslateRequest(datarequest)); 
     CompletableFuture<Response> future = new CompletableFuture<>(); 
     openRequests.put(requestId, future); 
     return future; 
    } 

    // ... 
} 
+2

その場合、リクエストごとに 'CompletableFuture'を使うことができます。あなたはそのような先物の地図を、あなたの "チケット"をキーにして維持することができます。ブロックしようとするスレッドは 'future.get()'を呼び出し、結果を返すリスナスレッドは 'future.complete()'を呼び出します。 – JimN

+0

@JimN、私はそれを読むでしょう。問題を解決するためのきれいな方法のように聞こえる。ありがとう。 – PentiumPro200

0

私は、これはうまくいくかもしれないと思います。私はここで説明されて探していたもの:それが待機しているスレッドによって通知されるまで

https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html

これは、スレッドの睡眠を作る能力です。使いやすいと思われる。

public class DataProcessor { 
    private List<WorkUnit> work; 
    private Server server; 

    public DataProcessor(List<WorkUnit> work, int numprocessors) { 
     this.work = work; 
     setupProcessors(numprocessors); 
     Server server = new Server(); 
    } 

    private void setupProcessors(int numprocessors) { 
     for(int i = 0; i < numprocessors; i++) { 
      WorkUnitProcessor worker = new WorkUnitProcessor(this, server); 
      worker.start(); 
     } 
    } 

    public synchronized WorkUnit getNextWorkUnit() { 
     if(work.isEmpty()) return null; 
     return work.remove(0); 
    } 
} 

public class WorkUnitProcessor(Server server) { 
    private DataProcessor master; 
    private Server server; 

    public WorkUnitProcessor(DataProcessor master) { 
     this.master = master; 
    } 

    @Override 
    public void run() { 
     while(true) { 
      WorkUnit work = master.getNextWorkUnit(); 
      if(work == null) return; 
      doWork(work); 
     } 
    } 

    public void doWork(WorkUnit work) { 
     //Do some work... 
     server.getData(datarequest, this); 
     while(!datarequest.filled) { 
      try { 
       wait(); 
      } catch (InterruptedException e) {} 
     } 
     //finish doing work 
    } 
} 

public class Server implements DataSourceDrivenCallback { 
    private DataSource ds; 
    private Map<Integer, OpenRequest> openrequests; 


    public Server() { 
     //setup socket and establish communication with server through DataSource object 
     DataSource ds = new DataSource(<ID>, <Socket>); 
    } 

    public synchronized void getData(DataRequest datarequest, WorkUnitProcessor workerthread) { 
     int requestid = ds.submitRequest(serializeAndTranslateRequest(datarequest)); 
     openrequests.add(new OpenRequest(workerthread, datarequest)); 
    } 

    @Override 
    public void incomingDataCallback(int requestid, ChunkOfData requesteddata) { 
     OpenRequest request = openrequests.get(requestid); 
     request.datarequest.storeData(requesteddata); 
     request.workerthread.notify(); 
    } 
} 

public class OpenRequest { 
    private WorkUnitProcessor workerthread; 
    private DataRequest datarequest; 
    //other details about request 
} 
+0

私はそのサーバーをスレッドとみなしますが、そのrun()メソッドは何をしますか? – bowmore

+0

多分私はそれについて間違っていました...私はなぜサーバがスレッドである必要があるかわかりません。あなたが正しいと思います。ワーカースレッドは、実際にはgetData()メソッドを使用してタスクをDataSourceに送信します。次に、他のスレッドで実行されているコールバックがincomingDataCallbackを呼び出し、そのデータをワーカースレッドに渡します。私はそれを反映するために上記の私の以前の解決策を改訂しました。 – PentiumPro200

+0

私はCompletableFutureがあなたが探しているものだと思います。たとえServerがスレッドではない場合でも、私はそれを私の答えに追加します – bowmore

関連する問題