2017-09-18 16 views
0

私はRxJavaのパフォーマンスを順次計算(私が想定していたもの)と比較して計算をブロックしようとしています。RxJavaの並列計算

私はthis postthis SO questionを見ていました。経験から、System.currentTimeMillis()とThread.sleep()を使用したベンチマークではI/Oではなく計算を処理する際に一貫した結果が得られないので、代わりに単純なJMHベンチマークを設定しようとしました。

私のベンチマークは、2つのintを計算し、それらを追加します。私は結果で困惑しています

public class MyBenchmark { 

    private Worker workerSequential; 
    private Worker workerParallel; 

    private int semiIntenseCalculation(int i) { 
     Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(i))))))))))))))))); 
     return d.intValue() + i; 
    } 

    private int nonIntenseCalculation(int i) { 
     Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(i))))))); 
     return d.intValue() + i; 
    } 


    private Observable<Object> intensiveObservable() { 
     return Observable.fromCallable(new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 

       int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101); 
       Integer i = semiIntenseCalculation(randomNumforSemi); 
       int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101); 
       Integer j = nonIntenseCalculation(randomNumforNon); 

       return i+j; 
      } 
     }); 
    }; 

    private Observable<Object> semiIntensiveObservable() { 
     return Observable.fromCallable(new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 
       int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101); 
       return semiIntenseCalculation(randomNumforSemi); 
      } 
     }); 
    }; 

    private Observable<Object> nonIntensiveObservable() { 
     return Observable.fromCallable(new Callable<Object>() { 
      @Override 
      public Object call() throws Exception { 
       int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101); 
       return nonIntenseCalculation(randomNumforNon); 
      } 
     }); 
    }; 

    public interface Worker { 
     void work(); 
    } 

    @Setup 
    public void setup(final Blackhole bh) { 

     workerSequential = new Worker() { 

      @Override 
      public void work() { 


       Observable.just(intensiveObservable()) 
        .subscribe(new Subscriber<Object>() { 

        @Override 
        public void onError(Throwable error) { 

        } 

        @Override 
        public void onCompleted() { 

        } 

        @Override 
        public void onNext(Object arg) { 
         bh.consume(arg); 

        } 
       }); 
      } 
     }; 

     workerParallel = new Worker() { 
      @Override 
      public void work() { 
       Observable.zip(semiIntensiveObservable().subscribeOn(Schedulers.computation()), 
           nonIntensiveObservable().subscribeOn(Schedulers.computation()), 
           new Func2<Object, Object, Object>() { 

        @Override 
        public Object call(Object semiIntensive, Object nonIntensive) { 
         return (Integer)semiIntensive + (Integer)nonIntensive; 
        } 

       }).subscribe(bh::consume); 
      } 
     }; 

    } 

    @Benchmark 
    public void calculateSequential() { 
     workerSequential.work(); 
    } 

    @Benchmark 
    public void calculateParallel() { 
     workerParallel.work(); 
    } 
} 

# Run complete. Total time: 00:00:21 

Benchmark      Mode Cnt  Score  Error Units 
MyBenchmark.calculateParallel avgt 5 15602,176 ± 1663,650 ns/op 
MyBenchmark.calculateSequential avgt 5 288,128 ± 6,982 ns/op 

は明らかに私が速くなるように並列計算を期待していました。 RxJavaはパラレルI/Oにのみ適していますか、なぜこれらの結果が得られますか?

答えて

1

ベンチマークが間違っています。並列処理が完了するのを待つ必要があります(blockingSubscribe経由で)かなり多くのGCオーバーヘッドが追加され、エグゼキュータの内部キューも膨らみます。

Hereは、さまざまな並行作業を測定するための基準ベンチマークです。作業をディスパッチするのにはオーバヘッドがあります。並列設定で作業項目ごとに500以上のサイクルがない限り、このようなフォーク結合タイプの並列作業負荷は改善されません。

+0

恐ろしく!これは、ディスパッチオーバーヘッドと私が関心のあるマルチスレッドの従来のJava実装との比較です。ParallelPerf.javaは素晴らしい出発点です! – ChopperOnDick

関連する問題