2017-01-25 11 views
1

sereialize()をテストするために以下を試しました。RxJava 2.x:serialize()が機能しない

私はonNextを1,000,000回呼び出して、2つの異なるスレッドから数えました。 次に、私はonCompleteに2,000,000を得ることを期待しました。

しかし、私は期待値を得ることができませんでした。

private static int count = 0; 

private static void setCount(int value) { 
    count = value; 
} 

private static final int TEST_LOOP = 10; 

private static final int NEXT_LOOP = 1_000_000; 

@Test 
public void test() throws Exception { 

    for (int test = 0; test < TEST_LOOP; test++) { 
    Flowable.create(emitter -> { 
     ExecutorService service = Executors.newCachedThreadPool(); 
     emitter.setCancellable(() -> service.shutdown()); 

     Future<Boolean> future1 = service.submit(() -> { 
     for (int i = 0; i < NEXT_LOOP; i++) { 
      emitter.onNext(i); 
     } 
     return true; 
     }); 

     Future<Boolean> future2 = service.submit(() -> { 
     for (int i = 0; i < NEXT_LOOP; i++) { 
      emitter.onNext(i); 
     } 
     return true; 
     }); 

     if (future1.get(1, TimeUnit.SECONDS) 
      && future2.get(1, TimeUnit.SECONDS)) { 
     emitter.onComplete(); 
     } 
    }, BackpressureStrategy.BUFFER) 
     .serialize() 
     .cast(Integer.class) 
     .subscribe(new Subscriber<Integer>() { 

      private int count = 0; 

      @Override 
      public void onSubscribe(Subscription s) { 
      s.request(Long.MAX_VALUE); 
      } 

      @Override 
      public void onNext(Integer t) { 
      count++; 
      } 

      @Override 
      public void onError(Throwable t) { 
      fail(t.getMessage()); 
      } 

      @Override 
      public void onComplete() { 
      setCount(count); 
      } 
     }); 

    assertThat(count, is(NEXT_LOOP * 2)); 
    } 
} 

私はserialize()が動作しないか、私はSerializedSubscriberのソースを確認しserialize()

の使用をmissunderstoodだろうか。 actual.onNext(t);以来

@Override 
public void onNext(T t) { 
    ... 
    synchronized(this){ 
    ... 
    } 
    actual.onNext(t); 
    emitLoop(); 
} 

は、同期ブロックの外に呼ばれて、私はactual.onNext(t);が同時に異なるスレッドから呼び出すことができることを推測します。また、に電話をしてonNextを実行する前にも可能です。

私はRxJava 2.0.4を使用しました。

+0

あなたはプロジェクトにバグを報告する必要があります:https://github.com/ReactiveX/RxJava/issues/new – spierce7

+0

@ spierce7私が問題を作る前に、私はバグを確認したいと思います。その使い方の誤解。私は英語がうまくない。 – arching

答えて

1

これはバグではなくFlowableEmittermisuseではありません。

onNext、のonErrorとonCompleteの方法は、単に加入者の方法のように、逐次的に呼び出す必要があります。これを確実にするには、serialize()を使用してください。他のメソッドはスレッドセーフです。 Flowable.serialize()を適用

FlowableEmitter.serialize()

createオペレータのために遅すぎます。

関連する問題