2017-11-08 20 views
0

私はscala observablesをcouchbaseから取得するためにmap、flatMap、zipを使用して結果を変換しています。問題は、アイテムがcouchbaseに存在しない場合、たとえば.zipはonCompleteだけ呼び出されないことです。例:oncompleteのみが呼び出されたときのscala rx observables上のマッピング

import rx.lang.scala._ 

def getIdsWithValues(ids: Seq[String]): Map[K, V] = { 
    val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id)) 
    values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id. 
    ... 
} 

は、だから私は望んでいた:

  1. 戻るkのマップ - > V
  2. は私が返さVにkが(私はVのようなものであることが予想.zipのカップルをみましょう存在しない場合None
  3. を私はジップがまったく項目がDBに存在しない場合は呼び出されませんでした見た。
私はのrunnin後に考え

上記のコードでは、idsの入力パラメータをスキャンし、zipで値がzipされていないものについては、その値にidを追加するが、別のフローを追加するようなものであるため、zipに既存の行と存在しない行の両方を処理させたい。

どうすればよいですか? .zipは、既存の行と存在しない行の両方をどのように処理できますか?

答えて

1

zip()演算子は使用しないでください。代わりに、flatMap()materialize().take(1)を使用してください。 materialize()onComplete()イベントをNotificationに変換し、Noneにマップできます。Notificationの値はSome(value)にマッピングされます。

def getIdsWithValues(ids: Seq[String]): Map[K, V] = { 
    val values = Observable.from(keyValueIds) 
    .flatMap(id => couchbaseBucket.async() 
    .get(id) 
    .materialize() 
    .take(1) 
    .map(res => if (res.isOnComplete()) 
        (id, None) 
        else 
        (id, Some(res.getValue)) 
    ... 
} 
+0

'.materialize()。取る(1).MAP(RES =>もし(res.isOnComplete())(ID、なし)他(ID、いくつかの(res.getValueが))'置き換えることができますちょうど '.headOption'によって。 –