2016-05-19 9 views
4

私は非常に標準的なページ区切り問題を持っていますが、これは簡単な再帰で扱うことができます。ページ区切り再帰のない観測可能な結果 - RxJava

public Observable<List<Result>> scan() { 
    return scanPage(Optional.empty(), ImmutableList.of()); 
} 

private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) { 
    return this.scanner.scan(startKey, LIMIT) 
      .flatMap(page -> { 
       if (!page.getLastKey().isPresent()) { 
        return Observable.just(results); 
       } 
       return scanPage(page.getLastKey(), ImmutableList.<Result>builder() 
         .addAll(results) 
         .addAll(page.getResults()) 
         .build() 
       ); 
      }); 
} 

しかし、これは明らかに大規模な呼び出しスタックを作成する可能性があります。これを必然的に行うことができますが、観察可能なストリームを維持するにはどうすればよいですか?ここで

は不可欠遮断する例を示します

public List<Result> scan() { 
    Optional<String> startKey = Optional.empty(); 
    final ImmutableList.Builder<Result> results = ImmutableList.builder(); 

    do { 
     final Page page = this.scanner.scan(startKey); 
     startKey = page.getLastKey(); 
     results.addAll(page.getResults()); 
    } while (startKey.isPresent()); 

    return results.build(); 
} 

答えて

1

それは、ソリューションの最もエレガントではないのですが、あなたは被験者と副作用を使用することができます。下記のおもちゃの例を参照してください。

import rx.Observable; 
import rx.Subscriber; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.HashMap; 
import rx.subjects.*; 

public class Pagination { 
    static HashMap<String,ArrayList<String>> pages = new HashMap<String,ArrayList<String>>(); 

    public static void main(String[] args) throws InterruptedException { 
     pages.put("default", new ArrayList<String>()); 
     pages.put("2", new ArrayList<String>()); 
     pages.put("3", new ArrayList<String>()); 
     pages.put("4", new ArrayList<String>()); 

     pages.get("default").add("2"); 
     pages.get("default").add("Maths"); 
     pages.get("default").add("Chemistry"); 

     pages.get("2").add("3"); 
     pages.get("2").add("Physics"); 
     pages.get("2").add("Biology"); 

     pages.get("3").add("4"); 
     pages.get("3").add("Art"); 

     pages.get("4").add(""); 
     pages.get("4").add("Geography"); 



     Observable<List<String>> ret = Observable.defer(() -> 
     { 
      System.out.println("Building Observable"); 
      ReplaySubject<String> pagecontrol = ReplaySubject.<String>create(1); 
      Observable<List<String>> ret2 = pagecontrol.asObservable().concatMap(aKey -> 
      { 
       if (!aKey.equals("")) { 
        return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0))); 
       } else { 
        return Observable.<List<String>>empty().doOnCompleted(()->pagecontrol.onCompleted()); 
       } 
      }); 
      pagecontrol.onNext("default"); 
      return ret2; 
     }); 
     // Use this if you want to ensure work isn't done again 
     ret = ret.cache(); 
     ret.subscribe(l -> System.out.println("Sub 1 : " + l)); 
     ret.subscribe(l -> System.out.println("Sub 2 : " + l)); 
     Thread.sleep(2000L); 
    } 
} 

改善のため編集されています。

+0

申し訳ありませんが、私はそれがうまくいくとは思いません。結果は設定されたスキャンの最初の結果になりますが、メイン実行スレッドのフローによってcurrentKeyがまだ設定されていないことがわかり、終了します。 –

+0

あなたが正しいように見えます。問題は鍵です。例えば、それを生成するための規則的な方法がない場合。 "1"、 "2"などとすると、ページ数が多すぎると独自の問題がある再帰的な解決策に悩まされていると思います。 – JohnWowUs

+0

件名と副作用を使用したハックなソリューションの編集済みの回答を参照してください。 – JohnWowUs

1

別のアプローチは、トークンストリームを使用することです:トークンが空になるまで結果が

public Observable<Window> paging() { 

     Subject<Token, Token> tokenStream = BehaviorSubject.<Token>create().toSerialized(); 

     tokenStream.onNext(Token.startToken()); 

     Observable<Window> dataStream = 
       Observable.defer(() -> tokenStream.first().flatMap(this::remoteData)) 
         .doOnNext(window -> tokenStream.onNext(window.getToken())) 
         .repeatWhen(completed -> completed.flatMap(__ -> tokenStream).takeWhile(Token::hasMore)); 

     return dataStream; 
    } 

で一度新鮮なリモートデータが得られ、ストリーム、および再サブスクライブするために、最初のトークンのためのデータを取得し、次のトークンをプッシュ

Window{next token=Token{key='1'}, data='data for token: Token{key=''}'} 
Window{next token=Token{key='2'}, data='data for token: Token{key='1'}'} 
Window{next token=Token{key='3'}, data='data for token: Token{key='2'}'} 
Window{next token=Token{key='4'}, data='data for token: Token{key='3'}'} 
Window{next token=Token{key='5'}, data='data for token: Token{key='4'}'} 
Window{next token=Token{key='6'}, data='data for token: Token{key='5'}'} 
Window{next token=Token{key='7'}, data='data for token: Token{key='6'}'} 
Window{next token=Token{key='8'}, data='data for token: Token{key='7'}'} 
Window{next token=Token{key='9'}, data='data for token: Token{key='8'}'} 
Window{next token=Token{key='10'}, data='data for token: Token{key='9'}'} 

ジャストアイデア、なぜwouldntのコピーpastableサンプル

public class RxPaging { 

    public Observable<Window> paging() { 

     Subject<Token, Token> tokenStream = BehaviorSubject.<Token>create().toSerialized(); 

     tokenStream.onNext(Token.startToken()); 

     Observable<Window> dataStream = 
       Observable.defer(() -> tokenStream.first().flatMap(this::remoteData)) 
         .doOnNext(window -> tokenStream.onNext(window.getToken())) 
         .repeatWhen(completed -> completed.flatMap(__ -> tokenStream).takeWhile(Token::hasMore)); 

     return dataStream; 
    } 

    private Observable<Window> remoteData(Token token) { 
     /*limit number of pages*/ 
     int page = page(token); 
     Token nextToken = page < 10 
       ? nextPageToken(token) 
       : Token.endToken(); 

     return Observable 
       .just(new Window(nextToken, "data for token: " + token)) 
       .delay(100, TimeUnit.MILLISECONDS); 
    } 

    private int page(Token token) { 
     String key = token.getKey(); 
     return key.isEmpty() ? 0 : Integer.parseInt(key); 
    } 

    private Token nextPageToken(Token token) { 
     String tokenKey = token.getKey(); 
     return tokenKey.isEmpty() ? new Token("1") : nextToken(tokenKey); 
    } 

    private Token nextToken(String tokenKey) { 
     return new Token(String.valueOf(Integer.parseInt(tokenKey) + 1)); 
    } 

    public static class Token { 
     private final String key; 

     private Token(String key) { 
      this.key = key; 
     } 

     public static Token endToken() { 
      return startToken(); 
     } 

     public static Token startToken() { 
      return new Token(""); 
     } 

     public String getKey() { 
      return key; 
     } 

     public boolean hasMore() { 
      return !key.isEmpty(); 
     } 

     @Override 
     public String toString() { 
      return "Token{" + 
        "key='" + key + '\'' + 
        '}'; 
     } 
    } 


    public static class Window { 
     private final Token token; 
     private final String data; 

     public Window(Token token, String data) { 
      this.token = token; 
      this.data = data; 
     } 

     public Token getToken() { 
      return token; 
     } 

     public String getData() { 
      return data; 
     } 

     @Override 
     public String toString() { 
      return "Window{" + 
        "next token=" + token + 
        ", data='" + data + '\'' + 
        '}'; 
     } 
    } 

    @Test 
    public void testPaging() throws Exception { 
     paging().toBlocking().subscribe(System.out::println); 
    } 
} 
+0

私はこれが好きです。私はそれを実装しようとしていないが、私はJohnWowUsのソリューションでSubjectの使用とどのように比較するのだろうか? –

+0

どちらもヘルパートークンストリームを使用します - 違いはデータストリーム "ラウンドロビン"が中止される方法です。主題に関して - ジョンの解答では無制限のリプレイ主題が使われているが、それは問題であるかもしれないし、問題ではないかもしれないが、推定ページ数に依存する。 BehaviorSubjectで安全に置き換えることができます(私は彼がTypoを作成し、ReplaySubject.createWithSize(1)の代わりにReplaySubject.create(1)を使用したと思います - これはBehaviorSubjectと同じです) –

-4

自分のページを反復してオブザーバーを作る独自のiterableを実装しますか?

例:

Observable.from(new Iterable<T>() { 

     @Override 
     public Iterator<T> iterator() { 
      return new Iterator<T>() { 
       @Override 
       public boolean hasNext() { 
        return hasNextPage(currentPageKey); 
       } 

       @Override 
       public T next() { 

        page = getNextPage(currentPageKey); 
        currentPageKey = page.getKey(); 
        return page; 
       } 

       @Override 
       public void remove() { 
        throw new UnsupportedOperationException(); 
       } 
      }; 
     } 
    }); 

よりエレガントな方法あなたのページの管理者(私は信じているあなたのコード例ではスキャナ変数)を作るのiterableを実装し、そこに反復ロジックを記述することです。

+0

これは、同期IterableおよびObservableでラップします。事実、問題は 'getNextPage'がObservableを返すため、Iteratorは実際にhasNextかどうかを知ることができません。 –

1

JohnWowUsの回答は素晴らしいですし、再帰を効果的に回避する方法を理解するのに役立ちましたが、まだ混乱していた点がいくつかありましたので、私の修正版を投稿しています。

概要:

  • 個々のページはSingleとして返されます。
  • Flowableを使用して、ページに含まれる各アイテムをストリーミングします。つまり、私たちの機能を呼び出す人は、個々のページについて知る必要はなく、含まれているアイテムだけを収集することができます。
  • BehaviorProcessorを使用して最初のページから開始し、次のページが利用可能であれば、現在のページでチェックした後で各後続のページを取得します。
  • キーは、processor.onNext(int)への呼び出しが次の繰り返しを開始することです。

このコードはrxjavareactive-streamsに依存します。

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.Optional; 
import java.util.function.Function; 

import io.reactivex.Flowable; 
import io.reactivex.Single; 
import io.reactivex.processors.BehaviorProcessor; 

public class Pagination { 

    // Fetch all pages and return the items contained in those pages, using the provided page fetcher function 
    public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) { 
     // Processor issues page indices 
     BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0); 
     // When an index number is issued, fetch the corresponding page 
     return processor.concatMap(index -> fetchPage.apply(index).toFlowable()) 
         // when returning the page, update the processor to get the next page (or stop) 
         .doOnNext(page -> { 
          if (page.hasNext()) { 
           processor.onNext(page.getNextPageIndex()); 
          } else { 
           processor.onComplete(); 
          } 
         }) 
         .concatMapIterable(Page::getElements); 
    } 

    public static void main(String[] args) { 
     fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println); 
    } 

    // A function to fetch a page of our paged data 
    private static Single<Page<String>> examplePageFetcher(int index) { 
     return Single.just(pages.get(index)); 
    } 

    // Create some paged data 
    private static ArrayList<Page<String>> pages = new ArrayList<>(3); 

    static { 
     pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1))); 
     pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2))); 
     pages.add(new Page<>(Arrays.asList("five"), Optional.empty())); 
    } 

    static class Page<T> { 
     private List<T> elements; 
     private Optional<Integer> nextPageIndex; 

     public Page(List<T> elements, Optional<Integer> nextPageIndex) { 
      this.elements = elements; 
      this.nextPageIndex = nextPageIndex; 
     } 

     public List<T> getElements() { 
      return elements; 
     } 

     public int getNextPageIndex() { 
      return nextPageIndex.get(); 
     } 

     public boolean hasNext() { 
      return nextPageIndex.isPresent(); 
     } 
    } 
} 

出力:

one 
two 
three 
four 
five 
関連する問題