2016-07-02 14 views
1

私たちはマイクロサービスアーキテクチャを採用しており、ネットワーク上でサービス間の呼び出しを行います。 トップレベルのサービスでRxJavaを使用しています。その結果、ボトムサービスへの並列リクエストが大量に生成されています。 このため、「No Route to Hostエラー」または「接続エラー」が表示されます。 その目的のために、RxJava Observableからの放出を遅くしたいので、新しい接続を作成する前に早期の接続が閉じられるようにします。 は、以下のサンプルコードです:あなたはジッパーを使用し、最初のObservable.fromに放出されるすべてのアイテムがX時間の間隔で行くことを組み合わせることができ、特定のステップを遅らせるためにRxJavaで観測可能な放射を遅らせる方法

package com.demo.rxjava.rxjaxa.creation; 
    import rx.Observable; 
    import rx.Subscriber; 
    import rx.schedulers.Schedulers; 

    public class Delay { 

     public static void main(String[] args) throws InterruptedException { 
      Observable.just(1, 2, 3, 4, 5).subscribeOn(Schedulers.io()) 
        .flatMap(integer -> { 
         return function1(integer); 
        }).observeOn(Schedulers.io()) 
        .subscribe(new Subscriber<String>() { 
         @Override 
         public void onNext(String item) { 
          System.out.println("Next: " + item); 
         } 

         @Override 
         public void onError(Throwable error) { 
          System.err.println("Error: " + error.getMessage()); 
         } 

         @Override 
         public void onCompleted() { 
          System.out.println("Sequence complete."); 
         } 
        }); 
     } 

    public Observable<String> function1(String id) { 
       // This is where we make network call 
       Observable<Response> response = Rx.newClient(RxObservableInvoker.class) 
         .target("http://example.com/resource") 
         .request() 
         .queryParam("id", id) 
         .rx() 
         .get(); 
       response.obserOn(Schedulers.from(threadExecutor)).flatMap(response->{ 
        return response.extractResponse(); 
       }); 
    } 
} 

答えて

0

/** 
* If we want to delay the every single item emitted in the pipeline we will need a hack, 
* one possible hack is use zip operator and combine every item emitted with an interval so every item emitted has to wait until interval emit the item. 
*/ 
@Test 
public void delay() { 
    long start = System.currentTimeMillis(); 
    Subscription subscription = Observable.zip(Observable.from(Arrays.asList(1, 2, 3)), Observable.interval(200, TimeUnit.MILLISECONDS), (i, t) -> i) 
              .subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start))); 
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(3000, TimeUnit.MILLISECONDS); 
} 

これは

time:537 
    time:738 
    time:936 

よりpracticleここで例https://github.com/politrons/reactive

+0

これは放射の開始を遅らせるだけです、私は特定の間隔で観測可能なそれぞれの放射を遅らせたいと思います。 –

+0

私の新しい応答を確認してください – paul

+0

それは動作しますが、実動コードでは背圧の誤差が紛失しています。 –

0

むしろあなたの要求を遅らせるよりも、あなたが並列活動を制限がSchedulerで発生下のサービスへの要求を持っている必要があります印刷します。例:

int maxParallel = 4; 
Scheduler scheduler = Schedulers.from(
    Executors.newFixedThreadPool(maxParallel)); 
... 
observable 
    .flatMap(x -> 
     submitToBottomService(x) 
     .subscribeOn(scheduler)) 
    .subscribe(subscriber); 

ところで、あなたは接続を閉じると言います。 Observable.using演算子は、反応的なコンテキストでリソースを閉じるために設計されています(終了時および終了時にリソースを閉じます)。あなたがまだそれを使用していない場合は、それを見てください。

+0

スケジューラでボトムサービスコールを実行していますが、ボトムサービスへの非同期コールを作成しているため、最初のコールから結果が返される前に多くの接続が作成されています。 –

+0

スケジューラを使用して非同期呼び出しを行う必要があります。コードを追加しますか? –

+0

私は元のコードを共有することはできませんが、質問をsudoコードで更新しました –

関連する問題