私たちはマイクロサービスアーキテクチャを採用しており、ネットワーク上でサービス間の呼び出しを行います。 トップレベルのサービスでRxJavaを使用しています。その結果、ボトムサービスへの並列リクエストが大量に生成されています。 このため、「No Route to Hostエラー」または「接続エラー」が表示されます。 その目的のために、RxJava Observableからの放出を遅くしたいので、新しい接続を作成する前に早期の接続が閉じられるようにします。 は、以下のサンプルコードです:あなたはジッパーを使用し、最初のObservable.fromに放出されるすべてのアイテムがX時間の間隔で行くことを組み合わせることができ、特定のステップを遅らせるためにRxJavaで観測可能な放射を遅らせる方法
package com.demo.rxjava.rxjaxa.creation;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class Delay {
public static void main(String[] args) throws InterruptedException {
Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io())
.flatMap(integer -> {
return function1(integer);
}).observeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
}
public Observable<String> function1(String id) {
// This is where we make network call
Observable<Response> response = Rx.newClient(RxObservableInvoker.class)
.target("http://example.com/resource")
.request()
.queryParam("id", id)
.rx()
.get();
response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{
return response.extractResponse();
});
}
}
これは放射の開始を遅らせるだけです、私は特定の間隔で観測可能なそれぞれの放射を遅らせたいと思います。 –
私の新しい応答を確認してください – paul
それは動作しますが、実動コードでは背圧の誤差が紛失しています。 –