2017-01-17 10 views
0

私は着信処理要求を持っていますが、共有リソースが枯渇して同時に処理したくないと思っています。私はまた、いくつかのユニークな鍵を共有する要求が同時に実行されないことを好むだろう:キーごとに観察できるが、決して完了しないためRxJava/RxScalaのgroupByとflatMap(maxConcurrent、...)を組み合わせる

def process(request: Request): Observable[Answer] = ??? 

requestsStream 
    .groupBy(request => request.key) 
    .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     requestsForKey 
     .flatMap(1, process) 
    }) 

しかし、上記の動作しません。これを達成する正しい方法は何ですか?動作しません何

は:

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently 
     requestsForKey.take(1) 
     .flatMap(1, process) 
    }) 

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing 
     // This discards all requests after the first if processing takes more than 100 milliseconds 
     requestsForKey.timeout(100.millis, Observable.empty) 
     .flatMap(1, process) 
    }) 

答えて

1

ここで私はこれを達成するために管理方法を説明します。 (同じキーを持つメッセージが順番に処理されるように)私は、専用の単一スレッドスケジューラを割り当てていますそれぞれの固有のキーについて:

@Test 
public void groupBy() throws InterruptedException { 
    final int NUM_GROUPS = 10; 
    Observable.interval(1, TimeUnit.MILLISECONDS) 
      .map(v -> { 
       logger.info("received {}", v); 
       return v; 
      }) 
      .groupBy(v -> v % NUM_GROUPS) 
      .flatMap(grouped -> { 
       long key = grouped.getKey(); 
       logger.info("selecting scheduler for key {}", key); 
       return grouped 
         .observeOn(assignScheduler(key)) 
         .map(v -> { 
          String threadName = Thread.currentThread().getName(); 
          Assert.assertEquals("proc-" + key, threadName); 
          logger.info("processing {} on {}", v, threadName); 
          return v; 
         }) 
         .observeOn(Schedulers.single()); // re-schedule 
      }) 
      .subscribe(v -> logger.info("got {}", v)); 

    Thread.sleep(1000); 
} 

私の場合はキーの数(NUM_GROUPSは)ので、私は専用のスケジューラを作成小さいです各キー:

Scheduler assignScheduler(long key) { 
    return Schedulers.from(Executors.newSingleThreadExecutor(
     r -> new Thread(r, "proc-" + key))); 
} 

キーの数がそれぞれ1のためのスレッドを捧げるために、無限または大きすぎる場合は、あなたがスケジューラのプールを作成し、このようにそれらを再利用することができます

Scheduler assignScheduler(long key) { 
    // assign randomly 
    return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)]; 
} 
+0

N私の場合は、リクエストストリームが長持ちし、長持ちし、多くの異なるキーを含んでいるので、動作しません。さらに、私は追加のスレッドを作成するのではなく、同じスレッドプールを使用したいと考えています。 – dtech

+0

@dtech考えられるのは、ジョブを特定のスレッドにスケジュールすることはできませんが、特定のスケジューラにスケジューリングすることができます。シングルスレッドのもの。私は私の答えの中に多数のキーのための解決策を追加しました。 –

関連する問題