2016-09-16 1 views
3

応答時間が非常に短いWebアプリケーションにakka-httpを使用しています。私はCompletableFutureのチェーンを持つルーティングDSL completeWithFutureメソッドを使用しています。HttpEntityをtoStrict()を介してStringに変換した後に遅延を引き起こす原因

CompletionStageメソッドのXXXasyncバリアントを使用して同じエグゼキュータを渡すと、ステージの処理に使用されるスレッドが任意に変更される可能性があることに気付きました。すべてのスレッド指定されたエグゼキュータの名前が使用されます。だから私は最初のCompletableFutureに私のカスタムエグゼキュータを渡し、それらのために同じスレッドを使うために、通常のバリアントで次のすべてのステージを連鎖させました。

問題:一段階は、HttpEntity.toStrict()を介してStringにHttpEntityの変換を行い、その方法はakka.actor.default-dispatcherからスレッドを使用します。ワークロードが増加すると、ターゲットレスポンス時間よりも遅くタイムアウト例外が発生しないことを示すtoStrictにタイムアウトを渡したにもかかわらず、次の段階の開始時に必要な応答時間を超えるリクエストがますます増加しています。

簡体コード

private Route handleRequest(final HttpRequest request) { 
    return completeWithFuture(CompletableFuture.runAsync(() -> preprocessing(), systemDispatcher) // Dispatcher 1 
      .thenCompose((preprocessingResult) -> // please ignore that preprocessingResult is not used in that simplified version 
      entityToString(request.entity()).thenApply((requestString) -> generateResponse(requestString)))); 
} 

public CompletionStage<String> entityToString(final HttpEntity entity) { 
     long start = System.nanoTime(); 
     return entity.toStrict(bodyReadTimeoutMillis, materializer).thenApplyAsync((final Strict strict) -> { 
      System.out.println(start-System.nanoTime()); // varies between <1ms and >500ms 
      return strict.getData().utf8String(); 
     }, systemDispatcher); // Dispatcher 2 
    } 

だから私の推測では、akkasのデフォルトの俳優のディスパッチャスレッドとバックのいずれかに私のカスタムエグゼキュータのスレッドからのスイッチが問題を引き起こしているということです。

質問:私のentityToStringメソッドの遅延の別の説明はありますか? toStrictと同じように、つまりスレッドを複数回切り替えることを避けながら、チャンクされたメッセージ本文全体を文字列として取得する方法がありますか?

遅いPOSTリクエストの処理を中止するには、toStrictメソッドのタイムアウト機能が必要です。

最後の日のためにそれについて考えるUPDATE

は、私はそれがスレッドを切り替えることなく、保証をアッカ、非ブロック読み取りを実現することは不可能であると信じています。したがって、実際の問題は、おそらくtoStrictの後のスケジューリングによって引き起こされる顕著に高い遅延です。

Iは異なるディスパッチャを使用しようとした(上記のコード内のコメントディスパッチャ1/ディスパッチャ2参照)、住民は遅延が50ミリ秒を超える場合にはディスパッチャ2のカウントログオン。私は適切なドキュメントを見つけることができませんが、私はこれがスケジュールされたタスクの数であると仮定します。 私は10000リクエスト、200同時接続でapacheベンチを実行し、50msを超える遅延を55回得ました。出力は最大80人の住民を示しています。

私はAmazons m3.2xlargeインスタンス(8 vCPU、30GB RAM、Ubuntu 16.04)でこのテストを実行しました。他のプロセスでCPUを消費していません)。ディスパッチャのタイプはfork-join-executorであり、並列性係数= 1です。

さらに多くの同時リクエスト数を持つ実際のトラフィックは、制限を超えるリクエストの割合が増加します(最大50%)。

リクエストを処理する平均時間は1ms未満です。toStrictの後のそのまれな遅延の原因とそれを避ける方法は何ですか?

+0

(1)toStrictは、単一のByteStringにデータを収集するとき、ネットワークからのデータを待つ必要があります。したがって、クライアントが実際に完全な要求を送信するのにかかる時間に依存します。 (2)8コアの同時リクエストを200回実行すると、1回のリクエストごとにわずかなCPU時間しか費やされないため、全体の実行時間は少なくともその分だけ増加します。 – jrudolph

+0

ほとんどの現実的なシナリオでは、クライアントにアクセスしているクライアントがデータを送信すると速度が遅くなる可能性があるため、クライアントを制御しないと、はるかに高いタイムアウトを適用する必要があります。いずれにしても、着信データをストリーミング形式で処理できる方が良いでしょう。そのため、1)toStrict 2を使用する必要はありません。2)受信データを処理する前にすべてのデータをバッファリングする必要はありません。 – jrudolph

+0

私の現在のシナリオでは、リアルタイムオークションに入札しています。応答は50ms後に有効ではありません。残念なことに、不完全なHTTP POSTボディを受け取ることがあります。要求に続く処理のためのリソースを解放するために、欠落している部分を待つ必要があります。ストリームで受信データを処理しようとしましたが、1)ストリームでHTTPリクエストに応答する方法2)50ms後にリクエストの処理を停止する3)多くのリクエストにTCPバックプレッシャを適用する。リクエストに複雑なJSONオブジェクトが含まれています。処理する前に解析する必要があります。 – teano

答えて

1

akka-streamsを使用してエンティティのコンテンツを取得し、エンティティの読み込みではなく別の方法でタイムアウトの要件を取得できます。

entity.getDataBytes().runFold(ByteString.empty(), ByteString::concat, materializer) 
.thenCompose(r -> r.utf8String()); 
+0

これを試しましたが、ストリームには 'akka.actor.default-dispatcher'も使用されています。 'thenCompose'は' thenApply'でなければなりません。 Wicheverでは、常に別のスレッドプールのスレッドを使用します。あなたのコードではexecutorが指定されていないので、 'ForkJoinPool.commonPool'からのスレッドを使います。また、私はストリームを止めて接続を閉じる方法を知らないでしょう。もし 'toStrict'のように、何とかタイムアウトを達成できれば。最後に、私はentity.getDataBytes()は、 'toStrict'ドキュメントで説明されているように、メッセージ本体のすべての可能な部分を集めていないと思います。 – teano

+0

カスタムディスパッチャを使用するように構成されたアクタのコンテキストを使用してマテリアライザを作成しようとしましたか? – Tiago

+0

同じ結果です。最後の日のことを考えてみると、スレッドを切り替えることなく、akkaが保証するノンブロッキングの読み込みを実現することは不可能だと私は信じています。実際の問題は、 'toStrict'の後のスケジューリングによって引き起こされる著しく高い遅延です。 – teano

関連する問題