2016-03-21 13 views
0

私はakka-httpを使用して、単一のホスト(http:// akka.ioなど)にhttpリクエストを送信しようとしています。問題は、作成されたフロー(Http()。cachedHostConnectionPool)は、N個のhttp要求が行われた後にのみ応答を出し始め、Nはmax-connectionsに等しいということです。akka httpが最初のN要求に対して応答を発行しないのはなぜですか?

import scala.util.Failure 
import scala.util.Success 
import com.typesafe.config.ConfigFactory 
import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.HttpRequest 
import akka.http.scaladsl.model.Uri.apply 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Source 

object ConnectionPoolExample extends App { 

    implicit val system = ActorSystem() 
    implicit val executor = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val config = ConfigFactory.load() 

    val connectionPoolSettings = ConnectionPoolSettings(config).withMaxConnections(10) 
    lazy val poolClientFlow = Http().cachedHostConnectionPool[Unit]("akka.io", 80, connectionPoolSettings) 

    val fakeSource = Source.fromIterator[Unit] {() => Iterator.continually { Thread.sleep(1000);() } } 
    val requests = fakeSource.map { _ => println("Creating request"); HttpRequest(uri = "/") -> (()) } 

    val responses = requests.via(poolClientFlow) 

    responses.runForeach { 
    case (tryResponse, jsonData) => 
     tryResponse match { 
     case Success(httpResponse) => 
      httpResponse.entity.dataBytes.runWith(Sink.ignore) 
      println(s"status: ${httpResponse.status}") 
     case Failure(e) => { 
      println(e) 
     } 
     } 
    } 
} 

出力は次のようになります。

Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
Creating request 
status: 200 OK 
Creating request 
status: 200 OK 
Creating request 
status: 200 OK 
... 

私はプールが空き接続の外にあるときに、すぐに彼らは準備ができているとしていない回答を放出できるようになる任意の設定パラメータを見つけることができないのです。

ありがとうございます!

答えて

0

理由は、Thread.sleepを呼び出すことによってクライアントが他の作業を行うのをブロックするからです。これは、反応するプログラム内では単に禁止されているためです。適切で簡単な方法は、Source.tickを使用することです。

+0

ありがとうございます。特定の例は、Source.tickを使用して解決します。このfakeSourceでThread.sleep(1000)を使用するのは残念です。実際のソースはKafkaからの読み込みであり、GraphStage [SourceShape [A]] ... 'val stream = consumerMap.getOrElse(topicName、List())を拡張することによって実装されています。head setHandler(out、new OutHandler { 単位= {。 ヴァルjsonData = JsonParser(stream.head.message())にconvertTo [A] プッシュ(OUT、jsonData) }}) ' ...それはまた、ブロックされている:onPull()DEFオーバーライドクライアント? – uladzimir

+0

はい、そのソースに '.async'を追加して、残りのストリームから切り離したいと思うでしょう。また、適切なカフカの統合についても取り組んでいます。リアクションカフカを参照してください。 –

関連する問題