2017-07-09 6 views
0

は、私がここにhttp4s & FS2にシーケンシャルRESTのAPIクローラを書いた:http4s&fs2でREST APIクローラーを並列化する方法は?

https://gist.github.com/NicolasRouquette/656ed7a2d6984ce0995fd78a3aec2566

これは、IDの開始セットを取得するためのREST APIサービスを照会IDのバッチのための要素を取得し、ベース続けることですこれらの要素で見つかった相互参照IDに、取得するすべての要素のマップを取得して返す新しいIDがなくなるまで繰り返します。

これは機能します。しかし、パフォーマンスは不十分です - 遅すぎます!

私はサーバーにアクセスできないため、バッチサイズを10,50,100,200,500、さらには1つのクエリですべてのIDをバッチ処理して試してみました。バッチサイズではクエリ時間が大幅に増加します。 大きなサイズ(500とすべて)で、私はサーバーからHTTP 500応答を得ました。

スレッドのプールを使用してロードバランシング方式で並列クエリをバッチ処理することを試したいと思います。しかし、fs2ドキュメントに基づいてこれを行う方法は私には不明です。

誰かがこれを達成する方法を提案できますか?

http4sの使用について& fs2:このライブラリは、簡単なクライアント側のプログラミングではかなり簡単に使用できます。サポートタスク、ストリームなどに重点を置くと、私は、バッチ並列クエリは何らかの形で実行可能でなければならないと考えています。

答えて

0

fs2.concurrent.join複数のストリームを同時に実行することができます。ガイドの特定のセクションはhttps://github.com/functional-streams-for-scala/fs2/blob/v0.9.7/docs/guide.md#concurrency

で利用できます。使用する場合は、idsのキューをチャンクし、httpタスクを作成してストリームにラップできます。次に、このストリームのストリームをjoinと同時に実行し、結果を結合します。

def createHttpRequest(ids: Seq[ID]): Task[(ElementMap, Set[ID])] = ??? 

def fetch(queue: Set[ID]): Task[(ElementMap, Set[ID])] = { 
    val resultStreams = Stream.emits(queue.toSeq) 
    .vectorChunkN(batchSize) 
    .map(createHttpRequest) 
    .map(Stream.eval) 

    val resultStream = fs2.concurrent.join(maxOpen)(resultStreams) 
    resultStream.runFold((Map.empty[ID, Element], Set.empty[ID])) { 
    case ((a, b), (_a, _b)) => (a ++ _a, b ++ _b) 
    } 
} 
関連する問題