TLDR:リクエストごとにストリームを具体化する(つまり、短命のストリームを使用する)方が良いですか?ストリームの?akka-stream + akka-httpライフサイクル
詳細:私はHTTPリクエストを受け取り、いくつかのサードパーティのダウンストリームサービス(私によって管理されていない)に分散し、結果を集約して戻します。私はクライアントの実装にakka-httpを使用しており、サーバー用にスプレーしています(レガシー、時間の経過とともにakka-httpに移行します)。概略的に:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
これは要求ごとにストリームを実体または回(の一部)のストリームを実体とリクエスト間で共有することによってのいずれかで達成することができます。
要求ごとにマテリアライズすると、マテリアライゼーションオーバーヘッドが発生するとなり、接続プールをどのように活用するのかは不明です。問題はhereと記述されています(多くの実体化がプールを使い果たす可能性があります)。私はhereのような長期実行HTTPストリームでプールをラップでき、mapAsync
"上流"にラップすることができますが、エラー処理戦略は私には分かりません。 1つのリクエストが失敗し、ストリームが終了すると、プールも停止しますか?さらに、要求と応答が順番に返されないため、要求と応答を調整する必要があるようです。
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
ストリームを共有すると、エラー処理と同様の問題が発生します。エラーが発生した場合、すべてのリクエストが飛んできたときにそのストリームを停止させる可能性があります。コードはhost level APIに似ていますが、ストリーム全体に向かってキューが並んでいます。
この場合、どちらの方が良いですか?
私は両方のソリューションを実装しようとしましたが、実装の各段階で多くの設計選択肢がありますので、「正しい」パスであっても簡単に取り締まることができます。
私はそれが無視できると信じていますが、それはakka-httpサーバーが動作するのと同じ方法です。
ありがとうラモン!私の問題は、私は実際には 'Source'を持っていないということです。ストリームへのすべての入力はhttpが要求されるので、たいていは' Source.single'を持っています。この場合、どのようにして1つのフローを得ることができますか? – Tim
さらに、1つのフローで要求タイムアウトをどのように処理できますか?私が見つけた唯一のタイムアウトは 'completionTimeout'ステージですが、ストリームに失敗し、エラーを下流に伝播しません。 – Tim