複数のObservableオブジェクトからの完了したスレッド応答を集計してクライアントに返す必要があるユースケースがあります。私の質問は、rX Javaを使用してそれを達成する方法です。ここではコードスニペットを書いていますが、この問題はタイムアウト後に何も返されないことです。完了したスレッドを集約してタイムアウト後にレスポンスを送信するJava
Observable<AggregateResponse> aggregateResponse = Observable.
zip(callServiceA(endpoint), callServiceB(endpoint), callServiceC(endpoint),
(Mashup resultA, Mashup resultB, Mashup resultC) -> {
AggregateResponse result = new AggregateResponse();
result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
return result;
}).timeout(5, TimeUnit.SECONDS);
あなたは各Observable
にtimeout
オペレータを配置する必要があり、加入者
aggregateResponse.subscribe(new Subscriber<AggregateResponse>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
//Timeout execute this rather than aggregating the finished tasks
System.out.println(throwable.getMessage());
System.out.println(throwable.getClass());
}
@Override
public void onNext(AggregateResponse response) {
asyncResponse.resume(response);
}
});
唯一の問題は、それがスレッド終了実行まで待つことです。例えば。 30秒後にスレッド終了が実行された場合、30秒以上経過しても応答が得られます。 – Eranda
これはzipの動作です。combineLatestを使用すると、すぐに各エミッションの更新を取得できます – yosriz
combineLatestでも同じように動作します。 – Eranda