2016-11-22 11 views
2

reactivecouchbase非同期データベースドライバを使用し始めましたが、いくつかの基本的な設計上の問題が発生しています。 従来のアプローチでは、データベースへの接続数を制限することでデータベースにかかる負荷を制限しました。しかし、私は非同期ドライバを使用して、新しいクエリでデータベースを変えることはできますか?akka-streamsマップで非同期ドライバを使用する方法mapAsync

これが重要になったのは、次のような例です。

私はデータベースを呼び出す方法が2通りあると言います。

My機能は、DBへの呼び出し:

asyncCallDB: Future[DBResponse] 
blockingCallDB: DBResponse 

は、今私は、私は2つの異なる機能を使用することができ、ストリームを超えるデシベルの呼び出しをマップする:

Flow.map() 
Flow.mapAsync(numberOfConcurrentCalls)() 

今私の質問どのように希望ですデータベースを呼び出すように選択してください:

Flow.map(blockingCallDB) //One call at a time with back preassure 
Flow.map(asyncCallDB) //Unlimited calls floods db no back pressure? 

Flow.mapAsync(numberOfConcurrentCalls)(blockingCallDB) //Up to numberOfConcurrentCalls at the same time with back pressure 
Flow.mapAsync(numberOfConcurrentCalls)(asyncCallDB) //Unlimited calls floods db no back pressure? 

私はここに不足していると感じていますこの種の死刑を立てる。

+1

[マップとmapAsyncの違い](http://stackoverflow.com/questions/35146418/difference-between-map-and-mapasync) –

+0

「Flow.mapAsync」を'Future [DBResponse]'ではなく 'DBResponse'を返す関数ですか? [Akka API doc](http://doc.akka.io/api/akka/2.4/index.html#[email protected]%5BT%5D(parallelism:Int)(f:Out => scala.concurrent.Future%5BT%5D):FlowOps.this.Repr%5BT%5D)これは不可能です。 –

答えて

3

ReactiveCouchbaseは、Couchbaseサーバーとの通信にAsyncHttpClientを使用します。できるだけ早くsee in the source codeを呼び出して、同時接続数を制限するsetMaximumConnectionsTotalを呼び出します。実際の値は、couchbase.http.maxTotalConnectionsで設定した内容によって異なります。

作成するCouchbaseBucketごとにAsyncHttpClientが1つあります。したがって、CouchbaseBucketごとに最大でmaxTotalConnectionsの接続があります。 Couchbase documentation on N1QL REST APIから

:要求が開始され における文の実行後に、結果が戻ってクライアントにストリーミングされるように

REST APIは、 終端を同期を実行したときに、ステートメント終了の実行。

したがって、実際には、各バケットの同時クエリ数はmaxTotalConnectionsに制限されています。

したがって、DBのバックプレッシャは、常に何らかの方法で制限されています。 maxTotalConnectionsを負でない数値に設定するか、またはRAMが限られているかファイル記述子の数が多いため、クライアントが接続をさらに作成できないためです。

ただし、クライアントのメモリが不足するように、あまりにも多くのを作成することは可能です。Futureこれが当てはまると思われるときはいつも、in this answerのようにmapAsync、または"Buffers and working with rate" (Akka documentation)に記載されている他のテクニックの1つを使用してください。

good description of mapAsync in the Akka documentationあります:未来の結果を返す関数へ

パスの着信要素。 未来が到着すると、結果は下流に渡されます。n個の要素まで同時に処理することができます...

Flow.mapAsyncは、それ自体では何も実行されません、それはちょうどあなたが、その後、SourceSink間を接続する必要がFlowを返すことに注意してくださいrunAkka Quick Start Guideはこれを非常に分かりやすい方法で説明しています。

+0

はい、ブロック非同期のみがブロックリクエストを正しく使用している場合は意味がありますか? – user3139545

+0

@ user3139545さて、ストリームを実行する必要がありますか?だからあなたの流れにある種のシンクを接続する必要があります。シンクは、フローによって結果が出るまで処理されません。 'Flow.mapAsync'はそれ自身では何もしません。別の' Flow'([doc](http://doc.akka.io/api/akka/2.4/index.html#akka.stream.scaladsl .Flow @ mapAsync [T](parallelism:Int)(f:Out => scala.concurrent.Future [T]):FlowOps.this.Repr [T]))。 –

+0

Couchbaseドキュメンテーションの断片は、クエリーが実際に同期していることを示しています:["REST APIは同期的に実行されるため、リクエスト内のステートメントの実行が開始されると、結果はクライアントにストリーミングされ、 "](http://developer.couchbase.com/documentation/server/4.5/n1ql/n1ql-rest-api/index.html) –

関連する問題