0
私は着信処理要求を持っていますが、共有リソースが枯渇して同時に処理したくないと思っています。私はまた、いくつかのユニークな鍵を共有する要求が同時に実行されないことを好むだろう:キーごとに観察できるが、決して完了しないためRxJava/RxScalaのgroupByとflatMap(maxConcurrent、...)を組み合わせる
def process(request: Request): Observable[Answer] = ???
requestsStream
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
requestsForKey
.flatMap(1, process)
})
しかし、上記の動作しません。これを達成する正しい方法は何ですか?動作しません何
は:
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
requestsForKey.take(1)
.flatMap(1, process)
})
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
})
N私の場合は、リクエストストリームが長持ちし、長持ちし、多くの異なるキーを含んでいるので、動作しません。さらに、私は追加のスレッドを作成するのではなく、同じスレッドプールを使用したいと考えています。 – dtech
@dtech考えられるのは、ジョブを特定のスレッドにスケジュールすることはできませんが、特定のスケジューラにスケジューリングすることができます。シングルスレッドのもの。私は私の答えの中に多数のキーのための解決策を追加しました。 –