2017-04-12 4 views
0

Future[T]またはjava CompletableFuture[T]またはAsyncCompletionHandler[T]org.asynchttpclientのいずれかを返すメソッドがあるとします。私はすべてを呼び出して絞り込みたい。非同期コールバックをサポートするメソッドへの呼び出しの抑制を実装する方法

どうすればよいですか?現在、私はMergeHub.sourceベースのSinkを使用して、すべてのリクエストをそれを通して処理しています。質問があります

  1. 良い方法がありますか?
  2. 私のログ出力では、すべての要求に費やされた時間が予想よりも少なくなっています。どうして ?

ここでは、換言すれば、メインのコンテンツのような複数の場所があるコード

import java.time.ZonedDateTime 

import akka.actor.ActorSystem 
import akka.stream.scaladsl.{MergeHub, Sink, Source} 
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ThrottleMode} 
import org.asynchttpclient.{DefaultAsyncHttpClient, _} 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future, Promise} 
import scala.language.postfixOps 
import scala.util.{Failure, Success, Try} 

object Main { 

    private implicit val system = ActorSystem("root") 
    private implicit val executor = system.dispatcher 
    private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) 

    type PendingRequest =() => Future[Try[Response]] 

    private val throttlingSink = 
    MergeHub.source[PendingRequest] 
     .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping) 
     .mapAsync(4)(_.apply()) 
     .to(Sink.ignore) 
     .run() 

    def wrap(p: Promise[Try[Response]]): AsyncCompletionHandler[Response] = new AsyncCompletionHandler[Response] { 
    override def onThrowable(t: Throwable): Unit = 
     p.success(Failure(t)) 

    override def onCompleted(response: Response): Response = { 
     p.success(Success(response)) 
     response 
    } 
    } 

    def makeRequest(url: String): Future[Response] = { 

    val p = Promise[Try[Response]] 

    Source.single[PendingRequest](() => { 
     asyncHttpClient 
     .prepareGet(url) 
     .execute(wrap(p)) 

     p.future 
    }) 
     .runWith(throttlingSink) 

    p.future.flatMap { 
     case Success(r) => Future.successful(r) 
     case Failure(ex) => Future.failed(ex) 
    } 
    } 

    val asyncHttpClient = new DefaultAsyncHttpClient() 

    def main(args: Array[String]): Unit = { 

    val start = ZonedDateTime.now() 
    println("Start!") 
    Source(1 to 20) 
     .mapAsync(4) { index => 
     println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index") 
     makeRequest(s"https://httpbin.org/get?param=$index").map { r => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}") 
     } 
     } 
     .runWith(Sink.ignore) 
     .onComplete { 
     case Success(_) => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!") 
      asyncHttpClient.close() 
      system.terminate() 
     case Failure(ex) => 
      ex.printStackTrace() 
      asyncHttpClient.close() 
      system.terminate() 
     } 

    Await.result(system.whenTerminated, Duration.Inf) 
    } 
} 

あります。そしてそれらのすべてはコールの合計としてスロットルでなければなりません。

答えて

1

MergeHubステップを実行せずに、パイプラインを合理化することができます。おおよそ二〜38に0〜秒から開始し、1、2秒ごとに - 私は要求が正しくスロットル参照の両方の実装では、しかし

object Main { 

    private implicit val system = ActorSystem("root") 
    private implicit val executor = system.dispatcher 
    private implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) 

    def makeRequest(url: String): Future[Response] = { 
    val promise = Promise[Response]() 
    asyncHttpClient.prepareGet(url).execute(new AsyncCompletionHandler[Response] { 
     def onCompleted(response: Response) = { 
     promise.success(response) 
     response 
     } 
     override def onThrowable(t: Throwable) { 
     promise.failure(t) 
     super.onThrowable(t) 
     } 
    }) 
    promise.future 
    } 

    val asyncHttpClient = new DefaultAsyncHttpClient() 

    def main(args: Array[String]): Unit = { 

    val start = ZonedDateTime.now() 
    println("Start!") 
    Source(1 to 20) 
     .throttle(1, FiniteDuration(2000, MILLISECONDS), 1, ThrottleMode.Shaping) 
     .mapAsync(4) { index => 
     println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Requesting $index") 
     makeRequest(s"http://httpbin.org/get?param=$index").map { r => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} s - Got $index - Code ${r.getStatusCode}") 
     } 
     } 
     .runWith(Sink.ignore) 
     .onComplete { 
     case Success(_) => 
      println(s"${ZonedDateTime.now().toEpochSecond - start.toEpochSecond} Done!") 
      asyncHttpClient.close() 
      system.terminate() 
     case Failure(ex) => 
      ex.printStackTrace() 
      asyncHttpClient.close() 
      system.terminate() 
     } 

    Await.result(system.whenTerminated, Duration.Inf) 
    } 
} 

下の例を参照してください。

ここで何を期待していますか?

+0

ありがとう、ステファノ! MergeHubの目的は、すべての呼び出しをREST APIに絞り込むことです。言い換えれば、 'main'の内容のような複数の場所があります。そしてそれらのすべてはコールの合計としてスロットルでなければなりません。 – expert

+0

しかし、 'MergeHub'を使っても、2秒ごとに1回の呼び出しを行うことで、スロットルがおおよそ機能することがわかります。あなたは代わりに何を見たいと思いますか? –

+0

多かれ少なかれ動作しますが、調整の解像度をより正確にすることが可能かどうかは疑問でした。与えられた例で正しく気づくので、私は予想した40秒の代わりに38秒を費やします。 – expert

関連する問題