2017-08-11 6 views
3

RxJava2に関する質問があります。私は、Listの結果を並列に実行するために、固定スレッドプールの別のスレッドでコンシューマを実行したいと考えています。ここに私のコードは次のとおりです。RxJava2でコンシューマを並列実行する方法は?

List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g"); 
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size()); 
    Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() { 
     @Override 
     public void accept(String data) throws Exception { 
      System.out.println(data + " forEach, thread is " + Thread.currentThread().getName()); 
     } 
    }); 

私は結果は得た:私がしたいことは、各consumor並列に別のスレッドで実行し、その結果である実際

a forEach, thread is pool-1-thread-1 
b forEach, thread is pool-1-thread-1 
c forEach, thread is pool-1-thread-1 
d forEach, thread is pool-1-thread-1 
e forEach, thread is pool-1-thread-1 
f forEach, thread is pool-1-thread-1 
g forEach, thread is pool-1-thread-1 

しかし:

a forEach, thread is pool-1-thread-1 
b forEach, thread is pool-1-thread-2 
c forEach, thread is pool-1-thread-3 
d forEach, thread is pool-1-thread-4 
e forEach, thread is pool-1-thread-5 
f forEach, thread is pool-1-thread-6 
g forEach, thread is pool-1-thread-7 

ことができます誰かがそれを起こす方法を教えてくれますか?

答えて

4

パラレルスレッドの項目を読み込むには、パラレルオペレータを提供するObservableの代わりにFlowable <>を使用してください。例えば :

Flowable.fromIterable(letters) 
     .parallel(letters.size()) 
     .runOn(Schedulers.from(fixedThreadPool)) 
     .sequential() 
     .forEach(data -> System.out.println(data + " forEach, thread is " + 
          Thread.currentThread().getName())); 

もしスレッドのいずれかが各呼び出しのために使用される予測できないように、出力が変化してもよいです。私のテストケースでは、私はより多くの情報については

c forEach, thread is pool-1-thread-3 
g forEach, thread is pool-1-thread-7 
a forEach, thread is pool-1-thread-1 
e forEach, thread is pool-1-thread-5 
d forEach, thread is pool-1-thread-4 
b forEach, thread is pool-1-thread-2 
f forEach, thread is pool-1-thread-6 

を得た、 `)(`並行して `` .toFlowable(BackpressureStrategy.MISSINGを)しないでくださいthe parallel-flows section of the RxJava Wiki

+2

に相談してください! iterableを持っている場合は、なぜ 'Flowable.fromIterable'を使わないで、背圧を無料で使うのでしょうか? – akarnokd

+0

ポイントは@akarnokdです。コードフラグメントを変更します – Zapodot

関連する問題