2017-12-11 9 views
0

Scala Mongo Driverを使用してMongoの基準に一致するすべてのレコードを取得しようとしています。スカラモンゴのドライバが将来の結果を得る

val MaxBuffer: Long = 100 
var docs: Queue[Document] = Queue.empty 
var sub: Option[Subscription] = None 

val q: Observable[Document] 

def fetchMoreRecords: Unit = sub.get.request(MaxBuffer) 

q.subscribe(new Observer[Document] { 

    override def onSubscribe(subscription: Subscription): Unit = { 
    sub = Some(subscription) 
    fetchMoreRecords 
    } 

    override def onError(e: Throwable): Unit = fail(out, e) 

    override def onComplete(): Unit = { 
    println("Stream is complete") 
    complete(out) 
    } 

    override def onNext(result: Document): Unit = { 
    if (doc.size == maxBuffer) { 
     fail(out, new RuntimeException("Buffer overflow")) 
    } else { 
     docs = docs :+ result 
    } 
    } 

}) 

(このコードは不完全である)私のような機能を必要とするでしょう

:完了

def isReady: Future[Boolean] = {} 

の観測を使用して

は、サブスクリプションを作成することによって、ストリームにアクセスすることができますonNextが少なくとも1回呼び出されたとき。これを行うには 悪い方法は次のようになります。

def isReady: Future[Boolean] = { 
    Future { 
     def wait: Unit = { 
      if (docs.nonEmpty) { 
       true 
      } else { wait } 
     } 
     wait 
    } 
} 

これを達成するための最良の方法だろうか?

答えて

0

あなたはPromiseを使用したい:

val promise = Promise[Boolean]() 
... 
override def onNext() = { 
    ... 
    promise.tryComplete(Success(true)) 
} 
override def onError(e: Throwable) = 
    promise.tryComplete(Failure(e)) 



val future = promise.future 

あなたは何の結果が存在しない場合を処理するために何かをしなければならない(それが今であるとして、未来が満たされることはありません...

+0

私が試しました –

+0

このアプローチについて、あなたは「面白くない」と感じているのですか? – Dima

+0

あなたのアプローチは完璧です。私が先に試したときに私はそれをやっていました。エレガントな方法:)悪い句。 –