2017-09-26 22 views
4

TLDR:リクエストごとにストリームを具体化する(つまり、短命のストリームを使用する)方が良いですか?ストリームの?akka-stream + akka-httpライフサイクル

詳細:私はHTTPリクエストを受け取り、いくつかのサードパーティのダウンストリームサービス(私によって管理されていない)に分散し、結果を集約して戻します。私はクライアントの実装にakka-httpを使用しており、サーバー用にスプレーしています(レガシー、時間の経過とともにakka-httpに移行します)。概略的に:

request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response

これは要求ごとにストリームを実体または回(の一部)のストリームを実体とリクエスト間で共有することによってのいずれかで達成することができます。

要求ごとにマテリアライズすると、マテリアライゼーションオーバーヘッドが発生するとなり、接続プールをどのように活用するのかは不明です。問題はhereと記述されています(多くの実体化がプールを使い果たす可能性があります)。私はhereのような長期実行HTTPストリームでプールをラップでき、mapAsync "上流"にラップすることができますが、エラー処理戦略は私には分かりません。 1つのリクエストが失敗し、ストリームが終了すると、プールも停止しますか?さらに、要求と応答が順番に返されないため、要求と応答を調整する必要があるようです。

// example of stream per request 

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port) 
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] = 
    Flow[HttpRequest] 
     .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream. 
     .via(connectionFlow) 
     .map { case (response, _) => response } 

val result = Range(1 to 5).foreach{ i => { 
    Source.single(i) 
    .map(HttpRequest(...)) 
    .via(httpFlow) 
    .mapAsync(1) { 
     // response handling logic 
    } 
    .runWith(Sink.last) 
}) 


// example of stream per request with long running http stream 

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue 
def queueRequest(request: HttpRequest): Future[HttpResponse] 

val result = Range(1 to 5).foreach{ i => { 
    Source.single(i) 
    .map(HttpRequest(...)) 
    .mapAsync(1)(queueRequest) 
    .mapAsync(1) { 
     // somehow reconcile request with response? 
     // response handling logic 
    } 
    .runWith(Sink.last) 
}) 

ストリームを共有すると、エラー処理と同様の問題が発生します。エラーが発生した場合、すべてのリクエストが飛んできたときにそのストリームを停止させる可能性があります。コードはhost level APIに似ていますが、ストリーム全体に向かってキューが並んでいます。

この場合、どちらの方が良いですか?

私は両方のソリューションを実装しようとしましたが、実装の各段階で多くの設計選択肢がありますので、「正しい」パスであっても簡単に取り締まることができます。

私はそれが無視できると信じていますが、それはakka-httpサーバーが動作するのと同じ方法です。

答えて

1

通常、単一の接続Flowを使用して、すべてのリクエストをその単一のフローで送信するほうがずっと優れています。主な理由は、新しいマテリアライゼーションが実際には新しい接続プールの設定に応じて新しいConnectionが形成されるという事実のためです。

あなたは、これはいくつかの合併症につながることを正しい:

オーダー:あなたが接続に渡しているタプル における第二の値としてランダムUUIDを提供することにより、あなたがあなたの能力を排除している流れ要求を応答に関連付ける。タプル内のその余分なTの値は、あなたがフローからどのHttpResponseを取得しているかを知る "相関ID"として使用できます。あなたの特定の例では、あなたが作成したRangeからInt初期使用することができます

val responseSource : Source[(Try[HttpResponse], Int), _] = 
    Source 
    .fromIterator(() => Iterator range (0,5)) 
    .map(i => HttpRequest(...) -> i) 
    .via(connectionFlow) 

を今すぐ各応答では、応答を処理するために使用することができ、オリジナルのInt値が付属しています。

エラー処理:「1つのリクエストが失敗し、ストリームが終了しました」という記載が間違っています。単一の要求の失敗は必ずしもストリームの障害を引き起こすわけではありません。むしろ、単に接続フローから(Failure(exception), Int)値を取得します。どのIntが失敗を引き起こしたか知っているので、フローからの例外があります。

+0

ありがとうラモン!私の問題は、私は実際には 'Source'を持っていないということです。ストリームへのすべての入力はhttpが要求されるので、たいていは' Source.single'を持っています。この場合、どのようにして1つのフローを得ることができますか? – Tim

+0

さらに、1つのフローで要求タイムアウトをどのように処理できますか?私が見つけた唯一のタイムアウトは 'completionTimeout'ステージですが、ストリームに失敗し、エラーを下流に伝播しません。 – Tim