2016-12-19 12 views
0

私は、レトロフィットサービスを利用しているアプリケーションにデータレイヤーを持っています。 (これまでのところ、ネットワーク上でのみ永続性があります。さらに開発を進めると、オフラインで最初にローカルストレージを追加します)RxJava2を使用した購読者の再クエリーと更新

Retrofitはサーバに電話をかけたときにObservable<List<Item>>を返します。これはうまくいく。私の購読者では、購読するとリストを受け取り、Itemsで自分のUIを設定することができます。

私が抱えている問題は、リストが(一部の外部メカニズムによって)変更されている場合、観察可能な再クエリサービスを改造してアイテムの新しいリストを出す方法です。私は、データが古くなっていることに気づくでしょうが、私はどのように再クエリを開始するかわかりません。ここで

は、あなたがここに戦っている問題は、ホット冷たい観察可能な違いである私のDataManager

class DataManager { 

    // Retrofit 
    RetrofitItemsService itemsService; 

    // The observalble provided by retrofit 
    Observable<List<Item>> itemsObservable; 

    //ctor 
    public DataManager(RetrofitItemsService itemsService) { 
     this.itemsService = itemsService; 
    } 

    /* Creates and stores an observable if one has not been created yet. 
    * Returns the observable so that it can be subscribed to by the function caller 
    */ 
    public Observable<List<Item>> getItems(){ 
     if(itemsObservable == null){ 
      itemsObservable = itemsService.getItems(); 
     } 

     return itemsObservable; 
    } 

    /* Adds a new Item to the list. 
    */ 
    public Completable addItem(Item item){ 
     Completable call = itemsService.addItem(item); 

     call.subscribe(()->{ 
      /* 
      < < <Here> > > 
      If someone has previously called getItems before this item was added, they now have stale data. 

      How can I call something like: 

      itemsObservable.refreshAllSubscribers() 
      */ 
     }); 

     return call; 
    } 
} 

答えて

2

のダウン切り詰めバージョンです。違いを詳細に説明するグッド記事がたくさんありますので、基本について簡単に説明しましょう。観察可能なコールド

すべての加入者のための新しいプロデューサーを作成します。つまり、2人の別個の加入者がに加入しているときは、同じ寒さで観察可能ながそれぞれ、これらの排出の異なるインスタンスを受信します。彼らは等しいかもしれませんが(!)、それほど異質なものではありません。ここにあなたのケースに適用されると、各サブスクライバは独自のプロデューサを取得し、サーバはデータを要求してストリームに渡します。すべての加入者は、それ自体のプロデューサからのデータを提供されます。

ホット観測可能な株式のすべてのオブザーバーとプロデューサー。例えば、プロデューサがオブジェクトの集合を反復している場合、排出の途中で第2のサブスクライバと一緒にジャンプすることは、後で放出されるアイテムのみを得ることを意味する(replayのような演算子によって変更されない場合)。任意の加入者によって受信されたすべてのオブジェクトは、単一のプロデューサからのものと同じように、すべてのオブザーバで同じインスタンスです。

このように見えることで、あなたのデータを配布するために熱心な観察者がいなければならないので、あなたはその熱い観察可能なもので一度だけそれを放出し、 。

幸いにも、寒い目に見えるものを暑いものに変えるのは大したことではありません。この振る舞いを模倣する独自のプロデューサを作成するか、shareのような一般的な演算子の1つを使用するか、ストリームを1つのように動作させるように変換することができます。

私はデータをリフレッシュし、このように、元の冷たい観測可能でそれをマージするためのPublishSubjectを使用して助言する:

class DataManager { 

    ..... 

    PublishSubject<Boolean> refreshSubject = PublishSubject.create(); 

    // The observable for retrieving always fresh data 
    Observable<List<Item>> itemsObservable; 

    //ctor 
    public DataManager(RetrofitItemsService itemsService) { 
     this.itemsService = itemsService; 
     itemsObservable = itemsService.getItems() 
           .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems())) 
    } 


    public Observable<List<Item>> getItems(){ 
     return itemsObservable; 
    } 

    /* Adds a new Item to the list. 
    */ 
    public Completable addItem(Item item){ 
     Completable call = itemsService.addItem(item); 

     call.subscribe(()->{ 
      refreshSubject.onNext(true); 
     }); 

     return call; 
    } 
} 
1

私はitemsService.getItems()は、単一の要素を返し推測Observableので、消費者が新鮮なデータを取得するために再登録する必要がありますとにかく彼らはRetrofit Observablesが遅れている/怠けているので、それを得るでしょう。

あなたが持っている可能性がありますが、トリガすることができPublishSubjectの助けを経由して、Observable独立し、「長い」データ変更時:

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized(); 

public Observable<Object> itemsChanged() { 
    return onItemsChanged; 
} 

public Completable addItem(Item item){ 
    Completable call = itemsService.addItem(item); 

    // prevent triggering the addItem multiple times 
    // Needs RxJava 2 Extensions library for now 
    // as there is no Completable.cache() or equivalent in 2.0.3 
    CompletableSubject cs = CompletableSubject.create(); 

    call.doOnComplete(() -> onItemsChanged.onNext("changed")) 
    .subscribe(cs); 

    return cs; 
} 
関連する問題