0
私はRxJavaのパフォーマンスを順次計算(私が想定していたもの)と比較して計算をブロックしようとしています。RxJavaの並列計算
私はthis postとthis SO questionを見ていました。経験から、System.currentTimeMillis()とThread.sleep()を使用したベンチマークではI/Oではなく計算を処理する際に一貫した結果が得られないので、代わりに単純なJMHベンチマークを設定しようとしました。
私のベンチマークは、2つのintを計算し、それらを追加します。私は結果で困惑しています
public class MyBenchmark {
private Worker workerSequential;
private Worker workerParallel;
private int semiIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(i)))))))))))))))));
return d.intValue() + i;
}
private int nonIntenseCalculation(int i) {
Double d = Math.tan(Math.atan(Math.tan(Math.atan(Math.tan(Math.tan(Math.atan(i)))))));
return d.intValue() + i;
}
private Observable<Object> intensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
Integer i = semiIntenseCalculation(randomNumforSemi);
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
Integer j = nonIntenseCalculation(randomNumforNon);
return i+j;
}
});
};
private Observable<Object> semiIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforSemi = ThreadLocalRandom.current().nextInt(0, 101);
return semiIntenseCalculation(randomNumforSemi);
}
});
};
private Observable<Object> nonIntensiveObservable() {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
int randomNumforNon = ThreadLocalRandom.current().nextInt(0, 101);
return nonIntenseCalculation(randomNumforNon);
}
});
};
public interface Worker {
void work();
}
@Setup
public void setup(final Blackhole bh) {
workerSequential = new Worker() {
@Override
public void work() {
Observable.just(intensiveObservable())
.subscribe(new Subscriber<Object>() {
@Override
public void onError(Throwable error) {
}
@Override
public void onCompleted() {
}
@Override
public void onNext(Object arg) {
bh.consume(arg);
}
});
}
};
workerParallel = new Worker() {
@Override
public void work() {
Observable.zip(semiIntensiveObservable().subscribeOn(Schedulers.computation()),
nonIntensiveObservable().subscribeOn(Schedulers.computation()),
new Func2<Object, Object, Object>() {
@Override
public Object call(Object semiIntensive, Object nonIntensive) {
return (Integer)semiIntensive + (Integer)nonIntensive;
}
}).subscribe(bh::consume);
}
};
}
@Benchmark
public void calculateSequential() {
workerSequential.work();
}
@Benchmark
public void calculateParallel() {
workerParallel.work();
}
}
:
# Run complete. Total time: 00:00:21
Benchmark Mode Cnt Score Error Units
MyBenchmark.calculateParallel avgt 5 15602,176 ± 1663,650 ns/op
MyBenchmark.calculateSequential avgt 5 288,128 ± 6,982 ns/op
は明らかに私が速くなるように並列計算を期待していました。 RxJavaはパラレルI/Oにのみ適していますか、なぜこれらの結果が得られますか?
恐ろしく!これは、ディスパッチオーバーヘッドと私が関心のあるマルチスレッドの従来のJava実装との比較です。ParallelPerf.javaは素晴らしい出発点です! – ChopperOnDick