2017-03-29 5 views
1

私は、couchbaseサーバーからデータを一括取得する方法を以下に記述しました。私は、クエリRxJavaとcouchbaseで並べ替えを使用するには?

"SELECT meta().id as id FROM bucket" 

を渡すとき

bucket.async() 
      .query(N1qlQuery.simple(query)) 
      .doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t))) 
      .flatMap(AsyncN1qlQueryResult::rows) 
      .flatMap(row -> 
      bucket.async(). 
      get(row.value().getString("id"))) 
      .map(JsonDocument::content). 
      toList() 
      .toBlocking() 
      .single(); 

このコードは正常に動作しているが、私は取得しています結果がソートされていない

"SELECT meta().id as id FROM bucket order by id ASC" 

のようなものを使用する場合。しかし、同じクエリをクエリコンソールで実行すると、結果は期待どおりになります。それは私がrxJavaで何か間違っていると信じさせます。これを解決するのを手伝ってください。

答えて

5

flatMap()演算子のために順序が失われます。これは、並行ストリームを適用しますが、順序は維持しません。で終了しますその後、

bucket.async(). 
     get(row.value().getString("id"))) 

get操作のそれぞれを:あなたは、あなたが作成し、行ごとに意味を、各onNext()のための新しいObservableに加入しているflatMap()を適用すると、あなたはパラレルこの行で実行している
別の時間、取り出されたコンテンツは順序付けられていない状態で放出される。

順序を維持したいが並列性を失わないようにするには、アクティブストリームを1つだけ維持するconcatMap()を使用し、各フェッチ操作を順番にサブスクライブします。

並列処理が必要な場合は、concatMapEager()を使用して、作成されたObservableをそれぞれ並列に実行しますが、順番にその項目を出力します。

+2

私はそれに同意します、concatMapEagerは私が考える方法です。ビュー "&include docs"については既にこれを行います。https://github.com/couchbase/couchbase-java-client/blob/master/src/main/java/com/couchbase/client/java/view/ViewQueryResponseMapperを参照してください。 .java#L220あなたが詳細に興味があれば。 – daschl

関連する問題