2017-08-10 12 views
0

複数のObservableオブジェクトからの完了したスレッド応答を集計してクライアントに返す必要があるユースケースがあります。私の質問は、rX Javaを使用してそれを達成する方法です。ここではコードスニペットを書いていますが、この問題はタイムアウト後に何も返されないことです。完了したスレッドを集約してタイムアウト後にレスポンスを送信するJava

Observable<AggregateResponse> aggregateResponse = Observable. 
    zip(callServiceA(endpoint), callServiceB(endpoint), callServiceC(endpoint), 
     (Mashup resultA, Mashup resultB, Mashup resultC) -> { 
      AggregateResponse result = new AggregateResponse(); 
      result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName()); 
      return result; 
     }).timeout(5, TimeUnit.SECONDS); 

あなたは各Observabletimeoutオペレータを配置する必要があり、加入者

aggregateResponse.subscribe(new Subscriber<AggregateResponse>() { 
     @Override 
     public void onCompleted() { 
     } 

     @Override 
     public void onError(Throwable throwable) { 
      //Timeout execute this rather than aggregating the finished tasks 
      System.out.println(throwable.getMessage()); 
      System.out.println(throwable.getClass()); 
     } 

     @Override 
     public void onNext(AggregateResponse response) { 
      asyncResponse.resume(response); 
     } 
    }); 

答えて

1

すべての観測結果を放出する前に値を放出するためにそれらの一方のみが長い時間がかかるので、もし、zipは、お待ちしておりますすでに放出されている他の人は、timeoutonError)でストリームをカットして、Zip Observableで放出することができます。

残りの部分を残してタイムアウトしたソースを無視したい場合、それぞれObservableにタイムアウト演算子を追加し、それぞれにonErrorReturnのようなエラー処理を追加すると、エラーリターンは何らかの '空の」結果(あなたがRxJava2でnullを使用することはできません)、そしてあなたは集計結果は、これらの空の結果を無視する場合:私は解決策を見つけ

Observable<AggregateResponse> aggregateResponse = Observable. 
      zip(callServiceA(endpoint) 
          .timeout(5, TimeUnit.SECONDS) 
          .onErrorReturn(throwable -> new Mashup()), 
        callServiceB(endpoint) 
          .timeout(5, TimeUnit.SECONDS) 
          .onErrorReturn(throwable -> new Mashup()), 
        callServiceC(endpoint) 
          .timeout(5, TimeUnit.SECONDS) 
          .onErrorReturn(throwable -> new Mashup()), 
        (Mashup resultA, Mashup resultB, Mashup resultC) -> { 
         AggregateResponse result = new AggregateResponse(); 
         result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName()); 
         return result; 
        }); 
+0

唯一の問題は、それがスレッド終了実行まで待つことです。例えば。 30秒後にスレッド終了が実行された場合、30秒以上経過しても応答が得られます。 – Eranda

+0

これはzipの動作です。combineLatestを使用すると、すぐに各エミッションの更新を取得できます – yosriz

+0

combineLatestでも同じように動作します。 – Eranda

関連する問題