2017-03-13 60 views
0

Flowable BackpressureStrategyがblockingSubscribeメソッドとどのように機能するかを理解するのが難しいです。誰かが私にそれを説明することができたらうれしいです。RxJava2 Flowable.create blockingSubscribeセマンティクスとBackpressureStrategy

私はこのコードを現在のトランクのFlowableTestsファイルでテストしていました。

@Test 
public void testCreateBackpressureDrop() { 
    Flowable.create(new FlowableOnSubscribe<Integer>() { 
     @Override 
     public void subscribe(FlowableEmitter<Integer> e) throws Exception { 
      e.onNext(1); 
      e.onNext(3); 
      e.onNext(4); 
      e.onComplete(); 
     } 
    }, BackpressureStrategy.DROP).blockingSubscribe(w); 

    verify(w, times(1)).onNext(1); 
    verify(w, times(1)).onNext(3); 
    verify(w, times(1)).onNext(4); 
    verify(w, times(1)).onComplete(); 
} 

私はsubscribe(w)BackpressureStragegy.DROPまたはBackpressure.BUFFERテストパスを使用している場合。しかし、私がblockingSubscribe(w)を使用した場合、が呼び出されなかったと言うと、Backpressure.DROPが失敗して、Backpressure.BUFFERが失敗します。

ありがとうございます!

答えて

2

これはSubscriberを模擬するためにMockitoを使った典型的な問題です:あなたはそのonSubscriberequest(N)を呼び出す必要があり:

@SuppressWarnings("unchecked") 
public static <T> Subscriber<T> mockSubscriber() { 
    Subscriber<T> w = mock(Subscriber.class); 

    Mockito.doAnswer(new Answer<Object>() { 
     @Override 
     public Object answer(InvocationOnMock a) throws Throwable { 
      Subscription s = a.getArgument(0); 
      s.request(Long.MAX_VALUE); 
      return null; 
     } 
    }).when(w).onSubscribe((Subscription)any()); 

    return w; 
} 

blockingSubscribeとの合併症がFlowableOnSubscribeが実行された後に、それは上w.onSubscribeを実行していることです。

+0

通常のコード(非テストコード)の 'onSubscribe'で' s.request(N) 'が呼び出されると予想されますか?これについてはまだ役立つ資料はありますか?ありがとう! –

+0

あなたが 'Subscriber'を介してそれを実装している場合は、yesを返します。 'DisposableSubscriber'と' ResourceSubscriber'がそれを行います。 RxJava 1に精通している場合は、https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0を参照してください。そうでなければRxJava 2に関するブログがここにあります。 – akarnokd

関連する問題