0
私はakka-httpを使用して、単一のホスト(http:// akka.ioなど)にhttpリクエストを送信しようとしています。問題は、作成されたフロー(Http()。cachedHostConnectionPool)は、N個のhttp要求が行われた後にのみ応答を出し始め、Nはmax-connectionsに等しいということです。akka httpが最初のN要求に対して応答を発行しないのはなぜですか?
import scala.util.Failure
import scala.util.Success
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri.apply
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object ConnectionPoolExample extends App {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val config = ConfigFactory.load()
val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10)
lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings)
val fakeSource = Source.fromIterator[Unit] {() => Iterator.continually { Thread.sleep(1000);() } }
val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) }
val responses = requests.via(poolClientFlow)
responses.runForeach {
case (tryResponse, jsonData) =>
tryResponse match {
case Success(httpResponse) =>
httpResponse.entity.dataBytes.runWith(Sink.ignore)
println(s"status: ${httpResponse.status}")
case Failure(e) => {
println(e)
}
}
}
}
出力は次のようになります。
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
Creating request
status: 200 OK
Creating request
status: 200 OK
Creating request
status: 200 OK
...
私はプールが空き接続の外にあるときに、すぐに彼らは準備ができているとしていない回答を放出できるようになる任意の設定パラメータを見つけることができないのです。
ありがとうございます!
ありがとうございます。特定の例は、Source.tickを使用して解決します。このfakeSourceでThread.sleep(1000)を使用するのは残念です。実際のソースはKafkaからの読み込みであり、GraphStage [SourceShape [A]] ... 'val stream = consumerMap.getOrElse(topicName、List())を拡張することによって実装されています。head setHandler(out、new OutHandler { 単位= {。 ヴァルjsonData = JsonParser(stream.head.message())にconvertTo [A] プッシュ(OUT、jsonData) }}) ' ...それはまた、ブロックされている:onPull()DEFオーバーライドクライアント? – uladzimir
はい、そのソースに '.async'を追加して、残りのストリームから切り離したいと思うでしょう。また、適切なカフカの統合についても取り組んでいます。リアクションカフカを参照してください。 –