2017-10-20 14 views
1

RxJavaを使用するために私の(Android)アプリケーションでいくつかのロジックを変換しようとしていますが、もっと高度なロジックを実行する方法を考え出すのに苦労しています。RxJavaで既存のデータを更新して保持する

ユースケースは次のとおりです。私はある種のフィードをユーザーに表示したいと思います。フィードはさまざまなソースからの項目で構成されています。メッセージ、記事などが含まれます.APIの制限により、アプリ自体は個々のソースを集めて1つのリストに表示する必要があります。例えば

、フィードに私の項目を前提とは、以下のようにされています。現在、フィードは別のスレッド上に構築されて

class FeedItem { 
    Type feedItem; // Type of the item, e.g. article, message, etc. 
    ... 
} 

、およびUIは、フィードが更新された際、リスナーを使用して通知されます。どのように実行されたのかを知るために、以下にいくつかの(擬似的な)Javaコードを示します(わかりやすくするためにスレッドとその他の管理コードは省略しています)。

class FeedProducer { 
    List<FeedItem> currentData = new ArrayList(); 

    public void refreshData() { 
     for (FeedSource source: getFeedSources()) { 
      Type sourceType = source.getType(); 
      // Remove existing items 
      currentData.removeIf(item -> item.feedItem.equals(sourceType)); 
      List<FeedItem> newItems = source.produceItems(); 
      // Add the new items 
      currentData.addAll(newItems); 
      // Notify the UI things have changed 
      notifyDataChanged(currentData); 
     } 
     // Notify the UI we are done loading 
     notifyLoadingComplete(); 
    } 
} 

この方法refreshData()は、ユーザーがデータを更新しようとするたびに呼び出されます。この方法では、いくつかのソースのみを更新することができます(例えば、戻り値をgetFeedSources()に変更するなど)。

これらのソースは、アプリの他の部分でも個別に使用されています。私はそれらをObservablesに変換しました。これにより、作業が非常に簡単になりました。データベースが変更された場合、変更はObservableによって簡単にUIにプッシュされます。

私の質問は、これらのObservableソースを1つのObservableにどのようにマージすることができますか(以前の結果の「グローバル」状態がある場合)です。私はさまざまな結合演算子を調べましたが、必要なものが見つかりませんでした。私がRxJavaをかなり新しくしているので、私が何か明白なものを見落としていたら謝ります。

答えて

1

ナイーブなオプションは、呼び出し側が最後のリストを格納し、新しいデータを要求しているとき、それをパラメータとして与えて、次のようになります。

public class ReactiveMultipleSources { 

    // region Classes 
    public enum SourceType { 
     TYPE_ARTICLE, 
     TYPE_MESSAGE, 
     TYPE_VIDEO 
    } 

    public static class Feed { 
     private SourceType sourceType; 
     private String content; 

     Feed(SourceType sourceType, String content) { 
      this.sourceType = sourceType; 
      this.content = content; 
     } 

     SourceType getSourceType() { 
      return sourceType; 
     } 
    } 
    // endregion 

    public static void main(String[] args) throws InterruptedException { 
     final List<Feed>[] currentList = new List[]{new ArrayList()}; 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 
     System.out.println(); 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 


    } 

    private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) { 
     return Observable.fromIterable(getSourceTypes()) 
       .observeOn(Schedulers.io()) 
       // Get List<Feed> forEach sourceType 
       .concatMap(ReactiveMultipleSources::getFeedItemsBySourceType) 
       .observeOn(Schedulers.computation()) 
       // Get list of "List of Feed for sourceType", = List<List<Feed>> 
       .toList() 
       .map(lists -> { 
        for (List<Feed> list : lists) { 
         SourceType sourceType = list.get(0).getSourceType(); 
         // Remove items of currentFeed whose sourceType has new List<Feed> 
         currentFeed.removeIf(temp -> temp.getSourceType() == sourceType); 
         // Add new items 
         currentFeed.addAll(list); 
        } 
        return currentFeed; 
       }) 
       .toObservable(); 
    } 

    // region Helper 
    private static List<SourceType> getSourceTypes() { 
     return new ArrayList<>(Arrays.asList(SourceType.values())); 
    } 

    private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) { 
     String content; 
     if (sourceType == SourceType.TYPE_ARTICLE) 
      content = "article "; 
     else if (sourceType == SourceType.TYPE_MESSAGE) 
      content = "message "; 
     else if (sourceType == SourceType.TYPE_VIDEO) 
      content = "video "; 
     else 
      content = "article "; 

     Feed feed1 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed2 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed3 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed4 = new Feed(sourceType, content + createRandomInt()); 

     return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4)); 
    } 

    // For simulating different items each time List<Feed> is required 
    private static int createRandomInt() { 
     return ThreadLocalRandom.current().nextInt(0, 21); 
    } 
    // endregion 
} 

出力例:

article 19 
article 15 
article 18 
article 18 
message 3 
message 2 
message 9 
message 1 
video 19 
video 17 
video 18 
video 11 

article 0 
article 4 
article 18 
article 15 
message 11 
message 16 
message 16 
message 4 
video 1 
video 7 
video 20 
video 2 
1

[0, 1],[10, 11]および[20, 21]を返す3つの個別のタスクがある場合は、それらを単一のリストにマージしたいとします。この場合、zip操作を使用できます。

public class TestRx { 
    public static void main(String[] args) { 
     // some individual observables. 
     Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1)); 
     Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11)); 
     Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21)); 

     Observable.zip(observable1, observable2, observable3, 
       new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() { 
        @Override 
        public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) { 
         // TODO: Remove existing items 

         // merge all lists 
         List<Integer> mergedList = new ArrayList<>(); 
         mergedList.addAll(list1); 
         mergedList.addAll(list2); 
         mergedList.addAll(list3); 
         return mergedList; 
        } 
       }) 
       .subscribe(new Observer<List<Integer>>() { 
        @Override 
        public void onNext(List<Integer> mergedList) { 
         System.out.println(mergedList); 
         // TODO: notifyDataChanged(mergedList) 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.out.println(throwable.toString()); 
         // TODO: handle exceptions 
        } 

        @Override 
        public void onCompleted() { 
         // TODO: notifyLoadingComplete() 
        } 
       }); 
    } 
} 

この結果、[0, 1, 10, 11, 20, 21]のように印刷されます。

関連する問題