2017-08-17 6 views
1

ことなく選択されたソース観測を中断するためのパターンは、私はホット無限ソース観測の数を有しています。これらからの排出は統合され、下流のオブザーバーによって処理されます。この処理の結果は、非監視中のObservableが引き続き放出すると予想され、オブザーバがNon-Suspended Emissionを消費すると予想される一方で、Source Observableの一部からの放出を一時的に中断するために使用される必要がある。熱くなっている中断中に発生したイベントは、無視しても間違いなく安全に監視できます。RxJava:一時ステートフル変数大きいRxJavaアプリケーションで

私が今までに思い付くことができた唯一の解決策は、グローバルなステートフル変数でフィルタを適用することでした。下のコードは原則を示しています。わかりやすくするために、私はObservableをソースとするロジックを中断してwhileループに移動し、単に中断/実行の決定をランダムに割り当てます。また、ソース観測は、単純な間隔に置き換えられている(実際のアプリケーションでは、イベントがランダムであり、観測に包まれている外部ソースから来る)

boolean is1running = true; 
boolean is2running = true; 
boolean is3running = true; 

public void multiStream() { 

    Observable<String> ob1 = Observable 
      .interval(100, TimeUnit.MILLISECONDS) 
      .map(s -> "OB1::" + s) 
      .filter(s -> keepRunning(1)); 

    Observable<String> ob2 = Observable 
      .interval(100, TimeUnit.MILLISECONDS) 
      .map(s -> "OB2::::" + s) 
      .filter(s -> keepRunning(2)); 

    Observable<String> ob3 = Observable 
      .interval(100, TimeUnit.MILLISECONDS) 
      .map(s -> "OB3:::::" + s) 
      .filter(s -> keepRunning(3)); 

    Observable<String> finalObs = Observable.merge(ob1, ob2, ob3); 

    finalObs.subscribe(s -> System.out.println(s)); 

    Random randomGenerator = new Random(); 

    while(true) 
    { 
     sleep(1000); 
     is1running = randomGenerator.nextBoolean(); 
     is2running = randomGenerator.nextBoolean(); 
     is3running = randomGenerator.nextBoolean(); 
    } 
} 

private boolean keepRunning(int i) { 
    switch(i) 
    { 
     case 1: return is1running; 
     case 2: return is2running; 
     case 3: return is3running; 
    } 

    return true; 
} 

コードが動作しているようですが、私は満足していませんグローバルなステートフル変数を使用することについて

機能的および反応的なパラダイムにも適合するこのような状況に適したパターンがありますか?

答えて

0

このような状況では、私はよくswitchMap()演算子を使用します。たとえば、booleanが各ソースに対して観測可能であると仮定して、ob1Switch,ob2Switch、およびob3Switchとします。その後、

Observable<String> ob1 = Observable 
     .interval(100, TimeUnit.MILLISECONDS) 
     .map(s -> "OB1::" + s); 

ob1Switchによって制御されます:あなたはob1を使用するために使用されるところはどこでも今

Observable<String> ob1Switchable = ob1Switch 
    .switchMap(switchOn -> switchOn ? ob1 : Observable.never()); 

、あなたはグローバルな状態を必要とせずに、ob1Switchableを使用することができます。 ob1をオンにしたい場合は、ob1Switchを現在のob1Switch.onNext(Boolean.TRUE)で最新の状態に保つ必要があります。

編集:フィードバックループを有するチェーンの例として:代わりにそれらの定義では

Observable<Integer> eventsPerSecond = 
    Observable.merge(ob1Switchable, ob2Switchable, ob3Switchable) 
    .buffer(1, TimeUnit.SECONDS) 
    .map(buf -> buf.size()); 
Observable<Boolean> obs1Switch = eventsPerSecond 
    .map(eps -> Boolean.valueOf(eps > LOW_THRESHOLD); 
Observable<Boolean> obs2Switch = eventsPerSecond 
    .map(eps -> Boolean.valueOf(eps > MEDIUM_THRESHOLD); 
Observable<Boolean> obs3Switch = eventsPerSecond 
    .map(eps -> Boolean.valueOf(eps > HIGH_THRESHOLD); 

、あなたはイベントの持続可能率を有するまで連続して発生源をシャットダウンするフィードバックループを持つことができます。

+0

はい、私は一般的なロジックを見ていますが、まずob1Switchを定義しますか? –