2017-06-16 4 views
0

テーブルaとbの変更を聞くためにsqlbriteを使いました。 combineLatest演算子を使用して、sqlbriteによって生成された観測値を結合します。 BiFunctionプロセスではobservableAとObservableBの放出アイテムが返されます。BiFunctionのバックプレッシャーに似たRxjava2アプローチ

private CompositeDisposable mSubscriptions = new CompositeDisposable(); 
private void initialize(){ 
    QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null); 
    QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null); 
    ResourceSubscriber subscriber = Flowable.combineLatest(
      RxJavaInterop.toV2Observable(observableA 
        .mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST) 
      , 
      RxJavaInterop.toV2Observable(observableB 
        .mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST) 
      , new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() { 
       @Override 
       public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception { 
        List<ResultItem> resultItems = convertToResultItems(aItems, bItems); // long process here, convert aItems and bItems to resultItems 
        return resultItems; 
       } 
      } 
    ) 
      .onBackpressureLatest() 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeWith(new ResourceSubscriber<List<ResultItem>>() { 
       @Override 
       public void onNext(List<ResultItem> resultItems) { 
        adapter.addData(resultItems); 
       } 

       @Override 
       public void onError(Throwable t) { 
       } 

       @Override 
       public void onComplete() { 
       } 
      }); 
    mSubscriptions.add(subscriber); 
} 

質問: BiFunctionは、実行時間が長すぎる場合(例えば10秒)、BiFunctionをリードすること、観測トリガ間隔よりも長い(例えば観測が1秒ごとにトリガーする)ことを理由Iの不必要な作業を行います最新の放出アイテムのみが必要ですが、BiFunctionは放出アイテムを1つずつ処理するので、BiFunctionは古い放出アイテムを処理するため、処理する必要はありません。 私はBiFunctionで古い放出アイテムをスキップし、BiFunctionで完了したapply()ごとに最新の放出アイテムを処理して、リソースの無駄を減らし、時間を節約したいと思っています。 rxjavaには、BiFunctionやこの問題を解決する他の方法のバックプレッシャに似たアプローチがありますか?

図は、現在および予想されるBiFunctionタイムラインを示しています。 figure link

この問題を解決するには2つの方法がありますが、欠点があります。

Method1: "aItems"と "bItems"をペアで結合し、その参照をswitchMapに渡してジョブを処理します。

Flaw:switchMapは最新のアイテムだけをサブスクライバに送信しますが、不要な作業は行います。

方法2:は、 "aItems"と "bItems"を組み合わせて、onNextへの参照を渡してジョブを処理します。

欠陥:UIスレッドをブロックしました。

+0

まずあなたが廃棄作業を定義する基準を特定する必要があります。 –

+0

BiFunctionが最新ではないアイテムとプロセスを受け取った場合、無駄な作業を意味する – KpSt

答えて

0

あなただけに沿ってcombineLatestのコンバイナ機能の値のペアを渡し、元のソーススレッドオフ計算を配置するobserveOnを使用することができます。

.combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b)) 
.onBackpressureLatest() 
.observeOn(Schedulers.computation(), false, 1) 
.map(pair -> compute(pair)) 
.observeOn(AndroidSchedulers.mainThread()) 
... 
+0

計算メソッドを計算スレッドに置きますが、マップ関数は観測アイテムから放出されたアイテムを1つずつ処理しますが、マップ関数havn't古いアイテムを削除します。 – KpSt

+0

バッファーレベル1をobserveOnに追加してこの例を更新しました。 – akarnokd

+0

ありがとうございました。それは働く。 – KpSt

関連する問題