2017-08-29 1 views
2

私はページ分割されたリソースを返すサービスを使用しています。ページの内容を非同期に取得されページ分割されたIterable [T]を連続Iterable [T]のように消費します。

/** 
* A page of contents that are retrieved asynchronously from their origin 
* 
* @param content The resource object 
* @param nextPageCursor The token representing the next page, or empty if no more pages to consume 
* @tparam T The type of resource withing the page 
*/ 
case class AsyncPage[T](
    val content: Future[Iterable[T]], 
    val nextPageCursor : Future[String] 
) { } 

getPage関数は次のように実装されAsyncPage[T]オブジェクトを返す

trait Service { 
    getPage(pageSize: Int, pageCursor: String): AsyncPage[Resource] 
} 

:これは、以下のインターフェースによって定義される単一の呼び出しを公開しますサービスが使用するいずれのストレージシステムからでも使用できます。

私のアプリケーションの必要性のため、私は実際にページを気にしません。あたかも単一のサービスであるかのようにサービスのリソースを消費できるようにコード化したいと思います。Iterable[T]

しかし、私はサービスの怠惰を維持したいです。私は必要以上のページを要求したくありません。つまり、前のページのすべての要素を消費しない限り、次のページをリクエストしたくないということです。私は1つのページの全体Iterable[T]を消費しているときはいつでも

、私はコードがgetPage(...)機能を使用して、次のページを要求したい、と最後のページnextPageCursorからpageCursorパラメータを提供します。

これを達成する方法を教えてもらえますか?あなたがブロックし気にしない場合

+0

これらの 'Future'sの' Iterable [T] 'ブロックはどうでしょうか?さもなければ、あなたができることは、Iterable [Future [Iterable [T]]です。 –

+0

私のアプリケーションはアクターアクターモデルフレームワークに基づいています。これはアクタースレッドをブロックすることを嫌がり、純粋に非同期で動作するように指示します。 –

答えて

1

さて、あなたはこのような何かを行うことができます。

class FutureIter[+P](fu: => Future[Iterator[P]]) extends AbstractIterator[P] { 
    lazy val iter = Await.result(fu) 
    def hasNext = iter.hasNext 
    def next = iter.next 
} 

    def fold[T](fs: Stream[Future[Iterator[T]]]): Iterator[T]= fs match { 
    case hd #:: tail => new FutureIter(hd) ++ fold(tail) 
    case _ => Iterator.empty 
    } 

    val pages = Stream 
    .iterate(getPage(size, "")) { getPage(size, _.nextPageCursor) } 
    .map(_.contents.map(_.iterator)) 

    val result: Iterator[T] = fold(pages) 

これは、最初のページの前にブロックされ、後続の各ページの最後に次のバッチをロードします。私は、ブロックせずにこれを行う方法はないと思うので、あなたは未来が満足されるまでページがどこで終わるかを知ることができないからです。

また、このコードが生成するイテレータは、より多くのページの検索を停止する基準を指定していないため、無限になります。 .takeWhileコールをpagesに送信して修正することができます。

StreamIteratorに置き換えて、キャッシュされるのではなく、すぐに破棄されるようにすることもできます。私はただStreamを使用しています。これはfoldを少し好きにしています(イテレータでは一致できず、代わりに使用して醜いif(it.hasNext) ...)。それは再帰的であるように

ところで、foldに見えますが、それは実際にないです:++は怠け者なので、あなたはすべての道左手側を通って反復するまでfold(tail)作品は実行されません - 後も外側のfoldはスタックから外れています。

+0

ありがとうございます。私はまだあなたの解決策を試していない、明日行うだろう。しかし、私はいくつかの研究を行ってきましたが、この[リンク](http://koff.io/posts/pagination-and-streams/)では、ページングソースを非同期ストリームにするための3つの実装が紹介されています。私はロックに頼っている再帰的なバージョンも見ることができます。私は他の解決策についてはわかりません。私が別のコメントに投稿したように、私が取り組んでいるアプリケーションはAkkaのアクターモデルを使用しています。主スレッドをロックしていません。(別のExecutionContextで非同期タスクを実行する可能性があります) –

+0

メインスレッドをロックしてはいけません。 'val result:Future [Iterator [T]] = Future(fold(pages)) – Dima

+0

あなたが参照しているリンクのコードはあなたのものを正確にはしません。ページングされた結果(たくさんあります)をトラバースするさまざまな方法を示し、それらをトラバースする多くの方法の1つである値のイテレータに「フラット化」しません。 – Dima

0

あなたがアッカを述べているので、あなたはソートの考えることができるSource[T]を作成することができ、非同期Iterable[T]として:

Source.unfoldAsync[String, T](startPageCursor) { cursor => 
    val page = getPage(pageSize, cursor) 
    for { 
    nextCursor <- page.nextPageCursor 
    it <- page.content 
    } yield Some((nextCursor, it)) 
}.mapConcat(identity) 

これは非常にクリーンかつ完全に非ブロッキングです。しかし、これが適切であれば、あなたのユースケースまであります。

関連する問題