2016-11-01 9 views
0

私のソースオブザーバブルを変更して、アプリケーションステータスがENABLEDとDISABLEDの間で切り替わるときに切断して再接続するようにしました。repeat完了をトリガーしたイベントを基にして

しかし、私のsourceObservable自体がcompleteを呼び出す状況がある場合、アプリケーションのステータスがENABLEDの場合、このトランスのrepeatWhenによってすぐに再接続されると思われます。

おそらくtakeUntilとrepeatWhenをグループ化して上から来る完全なイベントをキャッチしないようにすることで、これを解決するエレガントな方法がありますか?

N.B. applicationStatusObservableは要求に応じて最後の値を繰り返します。

ヘルプははるかに高く評価される:)

/** 
* Transforms the source observable so that it defers initial subscription until the service becomes available, 
* unsubscribes when the service becomes unavailable, and resubscribes when the service becomes available again. 
*/ 
public class AvailabilityTransformer<T> implements Observable.Transformer<T, T> { 

    private final Observable<ApplicationStatus> applicationStatusObservable; 

    AvailabilityTransformer(final Observable<ApplicationStatus> applicationStatusObservable) { 
     this.applicationStatusObservable = applicationStatusObservable; 
    } 

    @Override 
    public Observable<T> call(final Observable<T> sourceObservable) { 
     final Observable<ApplicationStatus> applicationEnabledObservable = 
       applicationStatusObservable.filter(applicationStatus -> applicationStatus == ENABLED); 
     final Observable<ApplicationStatus> applicationDisabledObservable = 
       applicationStatusObservable.filter(applicationStatus -> applicationStatus != ENABLED); 
     return sourceObservable 
       .takeUntil(applicationDisabledObservable) // Unsubscribe whenever the application is disabled 
       .repeatWhen(repeatObservable -> repeatObservable.flatMap(repeat -> 
         applicationEnabledObservable.flatMap(applicationStatus -> just(repeat)))) // Resubscribe when enabled again 
       .delaySubscription(applicationEnabledObservable.first()); // Delay the initial subscription until the application is first enabled 
    } 
} 
+0

applicationStatusObservableはホットまたはコールドですか?では、AvailabilityTransformerで何を達成しようとしていますか?私はそれを正しく取得しません。 –

+0

applicationStatusObservableは、リプレイ(1).refCount()でコールドです。いずれの場合でも、常に現在のアプリケーションステータスを返します。オブザーバーがサブスクライブしている場合は、アプリケーションステータスの変更の通知を受け取ります。 考えられるのは、アプリケーションが無効になっている場合、ソースオブザーバブルサブスクリプションをキャンセルすることです。再び有効になると、観測可能なソースに再登録されます。 –

+0

OperatorTakeUntilとOnSubscribeDelayなどの実装を調べると、TransformerではなくOperatorとしてこれを書く方がおそらくもっと適していると思います。 –

答えて

0

彼、

それはあなたが何をしているかを理解するために私にいくつかの時間がかかりました。私は最終的にそれを得たと思います。アプリケーションが有効か無効か(true/false)をサブスクライバに通知するisApplicationAvailableがあります。 observable isApplicationAvailableがtrueを返す場合、あなたはいくつかの作業(ホット/コールド)を行い、値を生成するだけの他のオブザーバブルを持っています。何らかの理由でアプリケーションが無効になると、オブザーバブルは値を生成してはなりません。

私はテストのためにRxJava2を使用します。

この例では、observable isApplicationActiveがtrueを返す場合、stringObservableはサブスクライバに値を伝えます。

@Test 
public void name() throws Exception { 
    Subject<Boolean> isApplicationActive = BehaviorSubject.<Boolean>create() 
      .toSerialized(); 

    Observable<Boolean> isApplicationActiveObservable = isApplicationActive 
      .hide() 
      .doOnNext(s -> System.out.println("isApplicationActive: " + s)); 

    isApplicationActive.onNext(false); 

    Thread.sleep(1_000); 

    Observable<String> stringObservable = Observable.interval(1_000, TimeUnit.MILLISECONDS) 
      .map(Objects::toString) 
      .doOnNext(s -> System.out.println("NextIntervalValue")) 
      .compose(createSwitchMapCompose(isApplicationActiveObservable)); 

    stringObservable.subscribe(s -> { 
     System.out.println("stringObservable: " + s); 
    }); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(true); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(false); 

    Thread.sleep(2_000); 

    isApplicationActive.onNext(true); 

    Thread.sleep(6_000); 

    isApplicationActive.onNext(false); 

    Thread.sleep(20_000); 
} 

private ObservableTransformer<String, String> createSwitchMapCompose(Observable<Boolean> isApplicationActiveObservable) { 
    return upstream -> upstream.switchMap(s -> isApplicationActiveObservable.take(1).flatMap(aBoolean -> { 
       if (aBoolean) { 
        return Observable.just(s); 
       } 
       return Observable.empty(); 
      }) 
    ); 
} 
関連する問題