2017-06-15 18 views
0

私はBlocking ConsumerをReactor Aluminum-SR1のFluxサブスクライバとして統合しようとしています。私は並列のスケジューラを使用してブロック操作を同時に実行したいと思います。FluxのpublishOnが期待どおりに動作しない

私は私の意図を記述するために、メインクラスを実装しました:

package etienne.peiniau; 

import org.reactivestreams.Subscriber; 
import org.reactivestreams.Subscription; 
import reactor.core.publisher.Flux; 
import reactor.core.scheduler.Schedulers; 
import reactor.util.function.Tuple2; 

public class Main { 

    public static void main(String[] args) throws InterruptedException { 
     Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) 
       .elapsed() 
       .publishOn(Schedulers.parallel()) 
       .subscribe(new Subscriber<Tuple2<Long, Integer>>() { 
        @Override 
        public void onSubscribe(Subscription subscription) { 
         System.out.println("[" + Thread.currentThread().getName() + "] Subscription"); 
         subscription.request(Long.MAX_VALUE); 
        } 

        @Override 
        public void onNext(Tuple2<Long, Integer> t2) { 
         System.out.println("[" + Thread.currentThread().getName() + "] " + t2); 
         try { 
          Thread.sleep(1000); // long operation 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage()); 
        } 

        @Override 
        public void onComplete() { 
         System.out.println("[" + Thread.currentThread().getName() + "] Complete"); 
        } 
       }); 
     // Waiting for the program to complete 
     System.out.println("[" + Thread.currentThread().getName() + "] Main"); 
     Thread.sleep(100000); 
    } 

} 

このコードの出力は以下の通りです:

[main] Subscription 
[main] Main 
[parallel-1] [3,1] 
[parallel-1] [1000,2] 
[parallel-1] [1001,3] 
[parallel-1] [1000,4] 
[parallel-1] [1000,5] 
[parallel-1] [1000,6] 
[parallel-1] [1001,7] 
[parallel-1] [1000,8] 
[parallel-1] [1000,9] 
[parallel-1] [1000,10] 
[parallel-1] [1000,11] 
[parallel-1] [1001,12] 
[parallel-1] [1000,13] 
[parallel-1] [1000,14] 
[parallel-1] [1000,15] 
[parallel-1] [1000,16] 
[parallel-1] [1001,17] 
[parallel-1] [1000,18] 
[parallel-1] [1000,19] 
[parallel-1] [1000,20] 
[parallel-1] Complete 

私の問題は、長い操作は常にオンに実行されていることですスレッドはパラレル1で1秒ごとに実行されます。

私は並列処理を手動で増やすか、弾性スケジューラを使用しようとしましたが、結果は同じです。

私はpublishOnメソッドがこのユースケースのために特別に設計されたと考えていました。私が何かを誤解したかどうか教えていただけますか?

答えて

2

実際には、期待通りに動作しますが、並列実行時間で処理されたすべての値はほぼ同じですが、常に1秒待つたびに同じスレッド内の要素を受け取ることがわかります。

私は単純にFluxというパラレルは、より多くのスレッドを意味するとは思わない、それは並行して作業を行うことを意味します。あなたは、たとえば次のようなコードを実行した場合:

map [single-1] 0 
map [single-1] 1 
... 
subscribe [single-1] [4,0] 
subscribe [single-1] [0,1] 
... 

をそして、あなたはそれらをそれは最初、それはすべての要素のmapを行う参照して、consumeことができます。

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList())) 
      .map(i -> { 
       System.out.println("map [" + Thread.currentThread().getName() + "] " + i); 
       return i; 
      }) 
      .elapsed() 
      .publishOn(Schedulers.single()) 
      .subscribeOn(Schedulers.single()) 
      .subscribe(t2 -> { 
       System.out.println("subscribe [" + Thread.currentThread().getName() + "] " + t2); 
      }); 

あなたは結果が表示されます。しかし、あなたは一例.publishOn(Schedulers.parallel())のために変更する場合は、表示されます。

map [single-1] 3 
subscribe [parallel-1] [5,0] 
map [single-1] 4 
subscribe [parallel-1] [0,1] 
map [single-1] 5 
... 

をだから、一度に並列スレッドの両方の操作を行います。私はすべてが正しく理解されているかどうかはわかりません。

並列実行には特定のParallelFluxがあります。例ではすべてが別のスレッドで実行されます下に

Flux.fromIterable(IntStream.range(0, 20).boxed().collect(Collectors.toList())) 
     .elapsed() 
     .parallel() 
     .runOn(Schedulers.parallel()) 
     .subscribe(t2 -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] " + t2); 
      try { 
       Thread.sleep(1000); // long operation 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     }, throwable -> { 
      System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage()); 
     },() -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] Complete"); 
     }, subscription -> { 
      System.out.println("[" + Thread.currentThread().getName() + "] Subscription"); 
      subscription.request(Long.MAX_VALUE); 
     }); 

結果は次のようになります。

[parallel-1] [8,0] 
[parallel-2] [0,1] 
[parallel-3] [0,2] 
[parallel-4] [0,3] 
[parallel-1] [0,4] 
... 

は、だから、結果を処理するために、いくつかのスレッドを使用しています。それは私の視点では本当に並行しています。

はまた、あなたがメソッド.subscribe(Subscriber<? super T> s)を使用する場合、すべての結果がシーケンシャルな方法で消費され、あなたが使用する必要があり、それらを並行して消費することに注意してください:Consumer<? super T> onNext,...引数

+0

@etiennepeiniauと

public void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete, Consumer<? super Subscription> onSubscribe) 

またはその他のオーバーロードされたメソッド私は単純なFluxでは "並列==もっとスレッド"ではないと思います。私は私の答えを編集し、例を追加します。 –

関連する問題