sereialize()
をテストするために以下を試しました。RxJava 2.x:serialize()が機能しない
私はonNext
を1,000,000回呼び出して、2つの異なるスレッドから数えました。 次に、私はonComplete
に2,000,000を得ることを期待しました。
しかし、私は期待値を得ることができませんでした。
private static int count = 0;
private static void setCount(int value) {
count = value;
}
private static final int TEST_LOOP = 10;
private static final int NEXT_LOOP = 1_000_000;
@Test
public void test() throws Exception {
for (int test = 0; test < TEST_LOOP; test++) {
Flowable.create(emitter -> {
ExecutorService service = Executors.newCachedThreadPool();
emitter.setCancellable(() -> service.shutdown());
Future<Boolean> future1 = service.submit(() -> {
for (int i = 0; i < NEXT_LOOP; i++) {
emitter.onNext(i);
}
return true;
});
Future<Boolean> future2 = service.submit(() -> {
for (int i = 0; i < NEXT_LOOP; i++) {
emitter.onNext(i);
}
return true;
});
if (future1.get(1, TimeUnit.SECONDS)
&& future2.get(1, TimeUnit.SECONDS)) {
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.serialize()
.cast(Integer.class)
.subscribe(new Subscriber<Integer>() {
private int count = 0;
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
count++;
}
@Override
public void onError(Throwable t) {
fail(t.getMessage());
}
@Override
public void onComplete() {
setCount(count);
}
});
assertThat(count, is(NEXT_LOOP * 2));
}
}
私はserialize()
が動作しないか、私はSerializedSubscriberのソースを確認しserialize()
の使用をmissunderstoodだろうか。 actual.onNext(t);
以来
@Override
public void onNext(T t) {
...
synchronized(this){
...
}
actual.onNext(t);
emitLoop();
}
は、同期ブロックの外に呼ばれて、私はactual.onNext(t);
が同時に異なるスレッドから呼び出すことができることを推測します。また、に電話をしてonNext
を実行する前にも可能です。
私はRxJava 2.0.4を使用しました。
あなたはプロジェクトにバグを報告する必要があります:https://github.com/ReactiveX/RxJava/issues/new – spierce7
@ spierce7私が問題を作る前に、私はバグを確認したいと思います。その使い方の誤解。私は英語がうまくない。 – arching