のカップルの後、私は、実行中の数時間後にハングHTTP接続プールがあります。アッカHTTP接続プールがハング時間
private def enqueue(uri: Uri): Future[HttpResponse] = {
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = uri) -> promise
queue.offer(request).flatMap {
case Enqueued => promise.future
case _ => Future.failed(ConnectionPoolDroppedRequest)
}
}
、解決:
private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = {
val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host)
Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew)
.via(pool).toMat(Sink.foreach {
case ((Success(res), p)) => p.success(res)
case ((Failure(e), p)) => p.failure(e)
})(Keep.left).run
}
を私はしてアイテムをキュー
private def request(uri: Uri): Future[HttpResponse] = {
def retry = {
Thread.sleep(config.dispatcherRetryInterval)
logger.info(s"retrying")
request(uri)
}
logger.info("req-start")
for {
response <- enqueue(uri)
_ = logger.info("req-end")
finalResponse <- response.status match {
case TooManyRequests => retry
case OK => Future.successful(response)
case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString))
}
} yield finalResponse
}
この関数の結果は、未来が成功すると常に変換されます。
def get(uri: Uri): Future[Try[JValue]] = {
for {
response <- request(uri)
json <- Unmarshal(response.entity).to[Try[JValue]]
} yield json
}
すべてが正常に機能し、ログに表示されるのはすべてreq-startであり、req-endではありません。
マイアッカの設定は、このようなものです:
akka {
actor.deployment.default {
dispatcher = "my-dispatcher"
}
}
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 256
parallelism-factor = 128.0
parallelism-max = 1024
}
}
akka.http {
host-connection-pool {
max-connections = 512
max-retries = 5
max-open-requests = 16384
pipelining-limit = 1
}
}
私は、これは構成の問題やコードの問題であるかはわかりません。私は私の並列性と接続数が非常に高いので、それがなければ非常に悪いreq/sレートを得る(私は速く可能であることを要求したい - 私はサーバーを保護するために他のレート制限コードを持っている)。
私は実際に応答を受け取った後にエンティティを消費します。私はいくつかの情報でポストを更新します。 – asuna
akka.pattern.afterを使用するように自分のコードを変更しただけで、問題が再発した場合には更新を通知します。 私は以前のThread.sleepコードをプロファイリングしましたが、プロファイラは、動作しなくなったスレッドがスリープしていないことを示しました。私が429を取得するたびに、jvisualvmはスレッドの1つが約500msの間スリープしていることを示し、そのスレッドが再び実行を開始するので、スケジューラを使用して少し懐疑的です。それにもかかわらず、Thread.sleepの使用は本当に悪かったです。 – asuna
私は同じ問題を抱えています。これはスレッドダンプです:https://gist.github.com/pradyuman/bf83a8f3a293d8c679fcb6dc5f566a80 – asuna