2016-01-06 7 views
34

Akka HTTPを使用して自分の要求を基本認証しようとしています。 私は外部リソースを持っているので、このリソースに休憩をかけなければなりません。Akka HTTP:今後ブロッキングするとサーバーがブロックされる

これは処理に時間がかかりますが、処理中はこのAPIの残りの部分がブロックされているようですが、この呼び出しを待っています。 私は非常に簡単な例でこれを再現している:私は、ログエンドポイントに投稿する場合

// used dispatcher: 
implicit val system = ActorSystem() 
implicit val executor = system.dispatcher 
implicit val materializer = ActorMaterializer() 


val routes = 
    (post & entity(as[String])) { e => 
    complete { 
     Future{ 
     Thread.sleep(5000) 
     e 
     } 
    } 
    } ~ 
    (get & path(Segment)) { r => 
    complete { 
     "get" 
    } 
    } 

、私のgetエンドポイントは、ログエンドポイントが規定され、5秒間待って立ち往生しています。

これは予期された動作ですか、そうであれば、API全体をブロックせずにブロック操作を行うにはどうすればよいですか?

答えて

118

あなたが守っ何することは予想される動作です - まだ、もちろん、それは非常に悪いです。それを守るための既知のソリューションとベストプラクティスが存在することをお勧めします。この回答では、問題を短く、長く、そして深く説明する時間を過ごしたいと思います。

短い答え:「は、ルーティングインフラストラクチャをブロックしない!」、常に操作を阻止するための専用のディスパッチャを使用します!

原因表示された症状の原因:問題は、ブロッキング先物が実行されるディスパッチャとしてcontext.dispatcherを使用しているという問題です。ルーティングインフラストラクチャは、同じディスパッチャ(実際にはスレッドの束)を使用して、実際に着信要求を処理します。使用可能なすべてのスレッドをブロックすると、ルーティングインフラストラクチャが枯渇します。 (議論とベンチマークのためのことは、Akka HTTPがこれを守ることができるなら、私はそれを私の研究todo-listに追加します)。

ブロッキングは、同じディスパッチャの他のユーザーに影響を与えないように特別な注意を払って処理する必要があります(これは、AkkaドキュメントのセクションBlocking needs careful managementで説明されているとおりです)。あなたの実行時間の長い操作は本当に一回の操作ではありませんが、シリーズは、その、あなたが別の上にそれらを分離していることができれば -

私はここでの注意にもたらすたかった何か他のものは1つが可能な場合にすべてのAPIをブロック避けるべきであるということです俳優、またはシーケンスされた先物。とにかく、もし可能ならば、そのようなブロッキングコールを避けて、そしてもしあなたがそうしなければならないならば、以下は、それらを適切に処理する方法を説明しています。

で詳細な分析と解決策

今、私たちが間違っているかを知ること、概念的に、さんはまさに上記のコードで壊れているものを見てみましょう、そしてどのようにこの問題への適切なソリューションがどのように見えます:

カラー=スレッド状態:

  • ターコイズ - SLEEPING
  • オレンジ - WAITING
  • 緑 -

RUNNABLEは、今度は、3つのコードの断片とどのように影響ディスパッチャ、およびアプリケーションのパフォーマンスを調べてみましょう。後にし、[B] GETリクエストを(そのための最初の質問にコードの上を参照)を要求続ける、それが

  • がブロックしていない[A]

    • :アプリは以下の荷重下に置かれているこの動作を強制するにはAしばらく不正なコードに将来

    1) [bad]ディスパッチャ行動を返す前に5secondブロッキングを引き起こします火災2000 POSTリクエスト、:

    // BAD! (due to the blocking in Future): 
    implicit val defaultDispatcher = system.dispatcher 
    
    val routes: Route = post { 
        complete { 
        Future { // uses defaultDispatcher 
         Thread.sleep(5000)     // will block on the default dispatcher, 
         System.currentTimeMillis().toString // starving the routing infra 
        } 
        } 
    } 
    

    私たちはアプリケーションを[a]ロードに公開しています。すでに多くのakka.actor.default-dispatcherスレッドが表示されています - 要求を処理しています - 小さい緑色のスニペット、そこ。

    blocking is killing the default dispatcher

    その後、我々はこれらのスレッドのブロッキング原因[B]負荷、スタート - あなたは前にアイドル状態にした後、ブロッキングに入る早期スレッド「デフォルト・ディスパッチャ-2,3,4」を参照してくださいすることができます。また、新しいスレッドは "default-dispatcher-18,19,20,21 ..."で開始されますが、すぐに眠りにつきます(!) - ここで貴重なリソースを無駄にしています!

    このように起動されるスレッドの数は、デフォルトのディスパッチャの設定によって異なりますが、50を超えることはありません。私たちは2kのブロック操作を実行したばかりなので、スレッドプール全体が枯渇しています - ルーティングのインフラに他の要求を処理するためのスレッドがないようなブロック操作が支配的です。あなたのapplication.confのconfigureで

    2) [good!]ディスパッチャ行動の良い構造化されたコード/ディスパッチャ: -

    (常に、以下に示すようなブロックの挙動を隔離ところでアッカのベストプラクティスである)のは、それについて何かをやってみましょうこのディスパッチャは、行動を阻止するための専用:

    my-blocking-dispatcher { 
        type = Dispatcher 
        executor = "thread-pool-executor" 
        thread-pool-executor { 
        // in Akka previous to 2.4.2: 
        core-pool-size-min = 16 
        core-pool-size-max = 16 
        max-pool-size-min = 16 
        max-pool-size-max = 16 
        // or in Akka 2.4.2+ 
        fixed-pool-size = 16 
        } 
        throughput = 100 
    } 
    

    あなたはに多くをお読みくださいのドキュメントを参照してください。しかし、主な点は、ThreadPoolExecutorを選んだことです。このスレッドには、ブロック操作に使用できるスレッドがハード制限されています。サイズの設定は、アプリケーションの機能、およびサーバーのコア数によって異なります。

    次は、デフォルトの代わりに、それを使用する必要があります。

    // GOOD (due to the blocking in Future): 
    implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher") 
    
    val routes: Route = post { 
        complete { 
        Future { // uses the good "blocking dispatcher" that we configured, 
          // instead of the default dispatcher – the blocking is isolated. 
         Thread.sleep(5000) 
         System.currentTimeMillis().toString 
        } 
        } 
    } 
    

    私たちは、同じ負荷、通常のリクエストの最初のビットを使用してアプリに圧力をかけた後、私たちは、ブロッキングのものを追加します。これは、スレッドプールは、この場合にはどのように動作するかです:

    the blocking pool scales to our needs

    は通常の要求は簡単にデフォルトのディスパッチャによって処理されるので、最初は、あなたがそこにいくつかの緑の線見ることができます - それは、実際の実行だ(私はしませんよ実際にはサーバーに負荷がかかるため、ほとんどアイドルです)。

    ここで、ブロック操作を開始すると、my-blocking-dispatcher-*が起動し、設定されたスレッド数まで起動します。それはそこのすべての睡眠を扱います。また、これらのスレッドで何らかの何も起こっていなければ、それらをシャットダウンします。プールをもう一杯にしてサーバーを襲った場合、プールは新しいスレッドを開始してスリープを処理しますが、その間には "ただそこにとどまり、何もしない"。

    この設定を使用すると、通常のGETリクエストのスループットは影響を受けませんでしたが、(まだかなり無料の)デフォルトのディスパッチャではまだうまく処理されていました。

    これは、リアクティブアプリケーションでのあらゆる種類のブロックを処理するために推奨される方法です。アプリの動作不良部分を「隔離する」(または「隔離する」)と呼ばれることがよくあります。この場合、悪い動作はスリープ/ブロッキングです。

    3) [workaround-ish]ディスパッチャ行動blockingが正しく適用:

    をこの例では、ブロッキングOPSに直面したときに助けることができるscaladoc for scala.concurrent.blockingメソッドを使用します。一般的には、ブロッキング操作から生き残るために、より多くのスレッドがスピンアップされます。

    // OK, default dispatcher but we'll use `blocking` 
    implicit val dispatcher = system.dispatcher 
    
    val routes: Route = post { 
        complete { 
        Future { // uses the default dispatcher (it's a Fork-Join Pool) 
         blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
           // but at the cost of exploding the number of threads (which eventually 
           // may also lead to starvation problems, but on a different layer) 
         Thread.sleep(5000) 
         System.currentTimeMillis().toString 
         } 
        } 
        } 
    } 
    

    アプリは、次のように動作します:ああ、これはよ」のヒントを阻止するため

    blocking causes more threads to be started

    あなたは新しいスレッドのLOTが作成されていることがわかりますが、これはありますブロックしているので、もっとスレッドが必要です」これにより、ブロックされた時間の合計が1)の例よりも小さくなりますが、ブロック操作が完了しても何も処理しない何百ものスレッドがあります...もちろん、FJPはこれを実行します)しかし、しばらくの間、私たちはブロッキング動作のために専用のスレッド数を正確に知っている2)ソリューションとは対照的に、大きな(制御されていない)スレッドを実行します。を合計

    のベストプラクティスを利用できるブロック操作のためのディスパッチャを持って、2)に示すパターンを使用して、そこにそれらを実行することです:-)デフォルトのディスパッチャをブロックすることはありません。

    希望すると、happy hakking

    話題アッカのHTTPバージョン2.0.1

    プロファイラを使用:多くの人がそうこの情報を追加して、私は上記の写真でスレッドの状態を視覚化するために使用するものプロファイラ私的にこの答えに応じて私に尋ねてきましたここに:私はあなたが自由にVisualVM from OpenJDKを使用して同じ結果を得ることができますが、素晴らしい商用プロファイラ(OSSは無料)であるYourKitを使用しました。

  • +1

    我々は今、公式文書の一部として、この返信が含まれていました。http://doc.akka.io/docs/akka/2.4/scala/http/handling-blocking-operations-in-akka-http-routes.html#handling -blocking-in-http-routes-scala –

    +0

    上記のリンクは壊れています。 –

    +0

    回答を返信してバックグラウンドで作業し続ける場合はどうすればよいですか? [この](https://gist.github.com/asarkar/37e4cb026c463f6334617e923cfc4b12)が動作しているようです。 –

    3

    奇妙ですが、私にとってはすべてが正常に機能します(ブロッキングなし)。ここでは、コードは次のとおりです。

    import akka.actor.ActorSystem 
    import akka.http.scaladsl.Http 
    import akka.http.scaladsl.server.Directives._ 
    import akka.http.scaladsl.server.Route 
    import akka.stream.ActorMaterializer 
    
    import scala.concurrent.Future 
    
    
    object Main { 
    
        implicit val system = ActorSystem() 
        implicit val executor = system.dispatcher 
        implicit val materializer = ActorMaterializer() 
    
        val routes: Route = (post & entity(as[String])) { e => 
        complete { 
         Future { 
         Thread.sleep(5000) 
         e 
         } 
        } 
        } ~ 
        (get & path(Segment)) { r => 
         complete { 
         "get" 
         } 
        } 
    
        def main(args: Array[String]) { 
    
        Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure { 
         case e => 
         system.shutdown() 
        } 
        } 
    } 
    

    また、あなたはあなたにonCompleteまたはonSuccessディレクティブに非同期コードをラップすることができます

    onComplete(Future{Thread.sleep(5000)}){e} 
    
    onSuccess(Future{Thread.sleep(5000)}){complete(e)} 
    
    +0

    ここうん、同じ。私はakka-http 2.0.1 – expert

    +0

    でそれをテストしました。また、futureをonComplete/onSuccessディレクティブにラップすることもできます。 –

    関連する問題