2017-02-27 9 views
3

のカップルの後、私は、実行中の数時間後にハングHTTP接続プールがあります。アッカHTTP接続プールがハング時間

private def enqueue(uri: Uri): Future[HttpResponse] = { 
    val promise = Promise[HttpResponse] 
    val request = HttpRequest(uri = uri) -> promise 

    queue.offer(request).flatMap { 
     case Enqueued => promise.future 
     case _ => Future.failed(ConnectionPoolDroppedRequest) 
    } 
} 

、解決:

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = { 
    val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host) 
    Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew) 
     .via(pool).toMat(Sink.foreach { 
     case ((Success(res), p)) => p.success(res) 
     case ((Failure(e), p)) => p.failure(e) 
     })(Keep.left).run 
    } 

を私はしてアイテムをキュー

private def request(uri: Uri): Future[HttpResponse] = { 
    def retry = { 
     Thread.sleep(config.dispatcherRetryInterval) 
     logger.info(s"retrying") 
     request(uri) 
    } 

    logger.info("req-start") 
    for { 
     response <- enqueue(uri) 

     _ = logger.info("req-end") 

     finalResponse <- response.status match { 
     case TooManyRequests => retry 
     case OK => Future.successful(response) 
     case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString)) 
     } 
    } yield finalResponse 
} 

この関数の結果は、未来が成功すると常に変換されます。

def get(uri: Uri): Future[Try[JValue]] = { 
    for { 
    response <- request(uri) 
    json <- Unmarshal(response.entity).to[Try[JValue]] 
    } yield json 
} 

すべてが正常に機能し、ログに表示されるのはすべてreq-startであり、req-endではありません。

マイアッカの設定は、このようなものです:

akka { 
    actor.deployment.default { 
    dispatcher = "my-dispatcher" 
    } 
} 

my-dispatcher { 
    type = Dispatcher 
    executor = "fork-join-executor" 

    fork-join-executor { 
    parallelism-min = 256 
    parallelism-factor = 128.0 
    parallelism-max = 1024 
    } 
} 

akka.http { 
    host-connection-pool { 
    max-connections = 512 
    max-retries = 5 
    max-open-requests = 16384 
    pipelining-limit = 1 
    } 
} 

私は、これは構成の問題やコードの問題であるかはわかりません。私は私の並列性と接続数が非常に高いので、それがなければ非常に悪いreq/sレートを得る(私は速く可能であることを要求したい - 私はサーバーを保護するために他のレート制限コードを持っている)。

答えて

3

あなたはサーバーから返された応答のエンティティを消費していません。以下のドキュメントを引用する:

リクエストのエンティティを消費(または破棄)することは必須です。 が誤って消費されたり破棄されたりしない場合、Akka HTTPは と仮定します。入力データはバックプレッシャを維持する必要があり、 の受信データをTCPバックプレッシャーメカニズムでストールします。クライアントは、HttpResponseのステータスに関係なく、エンティティを消費する必要があります。

エンティティは、資源枯渇を避けるために実行する必要があるSource[ByteString, _]という形で提供されます。

あなたが実体を読む必要がない場合は、エンティティがバイト消費する最も簡単な方法は、

res.discardEntityBytes() 

を使用することによって、それらを破棄することである(あなたが追加することにより、コールバックを添付することができます - 例えば - .future().map(...)を) 。

This page in the docsには、必要に応じてバイトを読み取る方法を含めて、これに対するすべての選択肢が記載されています。

--- EDIT

よりコード/情報を提供した後、資源の消費が問題ではないことは明らかです。この実装にはもう一つの大きな赤い旗があります。つまり、再試行メソッドのThread.sleepです。 これは、基本となるアクターシステムのスレッディングインフラストラクチャを枯渇させる可能性が高いブロッキングコールです。

これがなぜ危険なのかの完全な説明はdocsで提供されました。

akka.pattern.afterdocs)を変更してみてください。以下の例:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri)) 
+0

私は実際に応答を受け取った後にエンティティを消費します。私はいくつかの情報でポストを更新します。 – asuna

+0

akka.pattern.afterを使用するように自分のコードを変更しただけで、問題が再発した場合には更新を通知します。 私は以前のThread.sleepコードをプロファイリングしましたが、プロファイラは、動作しなくなったスレッドがスリープしていないことを示しました。私が429を取得するたびに、jvisualvmはスレッドの1つが約500msの間スリープしていることを示し、そのスレッドが再び実行を開始するので、スケジューラを使用して少し懐疑的です。それにもかかわらず、Thread.sleepの使用は本当に悪かったです。 – asuna

+0

私は同じ問題を抱えています。これはスレッドダンプです:https://gist.github.com/pradyuman/bf83a8f3a293d8c679fcb6dc5f566a80 – asuna