2017-09-22 5 views
1

Source.queueを使用してHttpRequestsをキューに入れ、クライアント側でリモートサーバーからファイルをダウンロードするように調整します。私はSource.queueがスレッドセーフではないことを理解しており、MergeHubを使用してスレッドセーフにする必要があります。以下は、Source.queueを使用し、cachedHostConnectionPoolを使用するコードです。akka-http MergeHubを使用してクライアント側から要求をスロットルする方法

import java.io.File 

import akka.actor.Actor 
import akka.event.Logging 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.client.RequestBuilding 
import akka.http.scaladsl.model.{HttpResponse, HttpRequest, Uri} 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

import scala.concurrent.{Promise, Future} 
import scala.concurrent.duration._ 
import scala.util.{Failure, Success} 

class HttpClient extends Actor with RequestBuilding { 

    implicit val system = context.system 
    val logger = Logging(system, this) 
    implicit lazy val materializer = ActorMaterializer() 

    val config = ConfigFactory.load() 
    val remoteHost = config.getString("pool.connection.host") 
    val remoteHostPort = config.getInt("pool.connection.port") 
    val queueSize = config.getInt("pool.queueSize") 
    val throttleSize = config.getInt("pool.throttle.numberOfRequests") 
    val throttleDuration = config.getInt("pool.throttle.duration") 

    import scala.concurrent.ExecutionContext.Implicits.global 

    val connectionPool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = remoteHost, port = remoteHostPort) 

    // Construct a Queue 
    val requestQueue = 
     Source.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure) 
     .throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping) 
     .via(connectionPool) 
     .toMat(Sink.foreach({ 
      case ((Success(resp), p)) => p.success(resp) 
      case ((Failure(error), p)) => p.failure(error) 
     }))(Keep.left) 
     .run() 

    // Convert Promise[HttpResponse] to Future[HttpResponse] 
    def queueRequest(request: HttpRequest): Future[HttpResponse] = { 
     val responsePromise = Promise[HttpResponse]() 
     requestQueue.offer(request -> responsePromise).flatMap { 
      case QueueOfferResult.Enqueued => responsePromise.future 
      case QueueOfferResult.Dropped  => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) 
      case QueueOfferResult.Failure(ex) => Future.failed(ex) 
      case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) 
     } 
    } 

    def receive = { 
     case "download" => 
     val uri = Uri(s"http://localhost:8080/file_csv.csv") 
     downloadFile(uri, new File("/tmp/compass_audience.csv")) 
    } 

    def downloadFile(uri: Uri, destinationFilePath: File) = { 

     def fileSink: Sink[ByteString, Future[IOResult]] = 
      Flow[ByteString].buffer(512, OverflowStrategy.backpressure) 
      .toMat(FileIO.toPath(destinationFilePath.toPath)) (Keep.right) 

     // Submit to queue and execute HttpRequest and write HttpResponse to file 
     Source.fromFuture(queueRequest(Get(uri))) 
      .flatMapConcat(_.entity.dataBytes) 
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true)) 
      .map(_.utf8String) 
      .map(d => s"$d\n") 
      .map(ByteString(_)) 
      .runWith(fileSink) 

    } 
} 

IはMergeHubを使用する場合しかし、それは[NOTUSED、(HttpRequestの、プロミス【のHttpResponse])]シンクを返します。私はresponse.entity.dataBytesを抽出し、ファイルシンクを使用してレスポンスをファイルに書き込む必要があります。私はこれを達成するためにMergeHubを使用する方法を理解できません。どんな助けもありがとう。

val hub: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = queueSize) 
    .throttle(throttleSize, throttleDuration.seconds, 1, ThrottleMode.shaping) 
    .via(connectionPool) 
    .toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(error), p)) => p.failure(error) 
    }))(Keep.left) 
    .run() 

答えて

1

現在、Source.Queueは実際にはスレッドセーフです。 MergeHubを使用する場合:

private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] = 
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host).tail.head, port, connectionPoolSettings) 


    val ServerSink = 
    poolFlow.toMat(Sink.foreach({ 
     case ((Success(resp), p)) => p.success(resp) 
     case ((Failure(e), p)) => p.failure(e) 
    }))(Keep.left) 

    // Attach a MergeHub Source to the consumer. This will materialize to a 
    // corresponding Sink. 
    val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] = 
    MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink) 


    val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run() 



    protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = { 
    val responsePromise = Promise[HttpResponse]() 
    Source.single((httpRequest -> responsePromise)).runWith(toConsumer) 
    responsePromise.future.flatMap(handleHttpResponse(_, unmarshal)) 
    ) 
    } 

} 
関連する問題