2016-09-25 20 views
16

しますRXJava - 私は次の操作を行い観測を作成する(例えば、バッファ、ウィンドウで)pausable観測可能

  • 彼らは、彼らがしている間
  • はすぐに、アイテムを発する一時停止している間、すべての項目をバッファリング
  • 一時停止/再開トリガーは別の観測から来る必要があります
  • メインスレッドで実行されないオブザーバブルで保存する必要があります。保存する必要があります。メインスレッドから一時停止/再開状態を変更する必要があります。

トリガーとしてBehaviorSubject<Boolean>を使用し、このトリガーをアクティビティーのonResumeおよびイベントにバインドします。 (コード例は、追加)

質問

私はセットアップに何かをしましたが、意図したとおり、それが機能していません。私は次のようにそれを使用します。

Observable o = ...; 
// Variant 1 
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) 
// Variant 2 
// o = o.compose(RXPauser.applyPauser(getPauser())); 
o 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(); 

を現在の問題は、バリアント1が正常に動作しなければならないこと、であるが、時には、イベントは単に放出されない - バルブのすべてが動作するまでの弁は、発光されていない(かもしれスレッド問題...)!ソリューション2ははるかに単純で機能しているようですが、本当に良いかどうかは分かりませんが、そうは思わないです。私は実際には、なぜ解決策1が時々失敗しているのですか?解決策2が(現在私のために知られていない)問題を解決するかどうかわかりません...

誰かが問題か、シンプルなソリューションは確実に動作するはずですか?または、私に信頼できるソリューションを教えてください。

コード

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser機能

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) 
{ 
    return observable -> pauser(observable, pauser); 
} 

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) 
{ 
    // this observable buffers all items that are emitted while emission is paused 
    Observable<T> sharedSource = source.publish().refCount(); 
    Observable<T> queue = sharedSource 
      .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) 
      .flatMap(l -> Observable.from(l)) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t)); 

    // this observable emits all items that are emitted while emission is not paused 
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) 
      .switchMap(tObservable -> tObservable) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t)); 

    // combine both observables 
    return queue.mergeWith(window) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t)); 
} 

活性

public class BaseActivity extends AppCompatActivity { 

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); 

    public BaseActivity(Bundle savedInstanceState) 
    { 
     super(args); 
     final Class<?> clazz = this.getClass(); 
     pauser 
       .doOnUnsubscribe(() -> { 
        L.d(clazz, "Pauser unsubscribed!"); 
       }) 
       .subscribe(aBoolean -> { 
        L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); 
       }); 
    } 

    public PublishSubject<Boolean> getPauser() 
    { 
     return pauser; 
    } 

    @Override 
    protected void onResume() 
    { 
     super.onResume(); 
     pauser.onNext(true); 
    } 

    @Override 
    protected void onPause() 
    { 
     pauser.onNext(false); 
     super.onPause(); 
    } 
} 
+0

人々は疑問に非常に明確にしているという重要な要件で不足している、これまでのところ、あるのに役立ちます: _を」一時停止/再開トリガは別の観測可能な "_から来なければなりません。 彼らは固定された時間スケジュールを望まない。 –

答えて

3

あなたは、実際に、バッファリングを停止する際に定義し、観測可能渡しブックからサンプルを.buffer()演算子を使用することができます。

Observable.interval(100, TimeUnit.MILLISECONDS).take(10) 
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) 
    .subscribe(System.out::println); 

第5章から、 '配列を飼いならす':https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

PublishSubjectObservableとして使用して、カスタム演算子で要素を入力することができます。バッファリングを開始する必要があるたびに、インスタンスを作成してObservable.defer(() -> createBufferingValve())

2

私はイベントをロギングするために同様のことをしました。
件名はいくつかのイベントを収集し、10秒間に1回はそれらをサーバーにプッシュします。

主なアイデアは、たとえば、クラスEventです。

public class Event { 

    public String jsonData; 

    public String getJsonData() { 
     return jsonData; 
    } 

    public Event setJsonData(String jsonData) { 
     this.jsonData = jsonData; 
     return this; 
    } 
} 

あなたはイベントのキューを作成する必要があります。

private PublishSubject<Event> eventQueue = PublishSubject.create(); 

それはBehaviorSubjectすることができ、それはあなたがサーバーにイベントをプッシュ処理するた、ロジックを作成する必要があります次に

問題ではありません。 :

eventObservable = eventQueue.asObservable() 
      .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds 
      .toList() 
      .doOnNext(new Action1<List<Event>>() { 
       @Override 
       public void call(List<Event> events) { 
        apiClient.pushEvents(events);  //push your event 
       } 
      }) 
      .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { 
       @Override 
       public Observable<List<Event>> call(Throwable throwable) { 
        return null; //make sure, that on error will be never called 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.io()); 

次に、サブスクライブしてサブスクリプションを保持する必要がありますription、あなたがそれを必要としないまで:

eventSubscription = eventObservable.subscribe() 

ホームこれは、この質問に答えるためにしようと

関連する問題