2016-05-18 10 views
3

私は以下を試みました。RxJavaのonBackpressureDrop()は項目を削除しますか?

public static void main(String[] args) throws Exception { 
    Observable.interval(100L, TimeUnit.MILLISECONDS) 
    .onBackpressureDrop().subscribe(new Subscriber<Long>() { 

    @Override 
    public void onStart() { 
     request(1L); 
    } 

    @Override 
    public void onNext(Long t) { 
     System.out.println("received: " + t); 
     try { 
     Thread.sleep(1000L); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
     request(1); 
    } 

    @Override 
    public void onCompleted() { 
     System.out.println("onCompleted"); 
    } 

    @Override 
    public void onError(Throwable e) { 
     e.printStackTrace(); 
    } 
    }); 

    TimeUnit.SECONDS.sleep(10L); 
} 

私はonBackpressureDropメソッドが要求する前にアイテムをドロップなるので、加入者がアイテム0, 10, 20, 30...を受けるであろうと予想しました。 結果は0, 1, 2, 3...でした。

また、onBackpressureLatest()を試しましたが、結果は同じ0, 1, 2, 3...でした。

これは私にはonBackpressureBufferと同じように見えます。

私はこの方法を誤解していますか? もしそうなら、私はどのようにしてonBackpressureDropメソッドをJavadocの大理石図として使うのですか?

答えて

1

onBackpressureDropこの特定のケースでは「バックプレッシャー」がないため、ここでは何も行いません。 subscribeメソッドで行うことは、Observable.intervalメソッドによって新しい値が作成されたスレッドをブロックし、各エミッション後に1秒間スリープさせることです。したがって、Observable.intervalは100ミリ秒ごとに値を生成しませんが、これを1秒に1回だけ実行する必要があります。

私はあなたがonBackpressureDropsubscribeの値の生成、あなたは物事が異なるスレッド上にあるようなsubscribeobserveOnを置くことをお勧めします。エフェクトを確認するには、小さいバッファサイズ(例:1)のthis overloadを使用します。どんな種類のスケジューラが私が知る限りはやるだろう。

+0

私は疲れて、私は欲しいものを手に入れました。だから、問題は私も排出を阻止したことです。今分かります。どうもありがとうございました。 – otal

1

これは、interval()がスリープするように動作しているスレッドを置くために起こります。一つの可能​​な解決策 - 追加observeOn(スケジューラ、bufferSizeの)オペレータ:

を受信:0
最小限のバッファサイズで

Observable.interval(100L, TimeUnit.MILLISECONDS) 
      .onBackpressureDrop() 
      .observeOn(Schedulers.io(), 1) 
      .subscribe(new Subscriber<Long>() { 

       @Override 
       public void onStart() { 
        request(1L); 
       } 

       @Override 
       public void onNext(Long t) { 
        System.out.println("received: " + t); 
        try { 
         Thread.sleep(1000L); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
        request(1); 
       } 

       @Override 
       public void onCompleted() { 
        System.out.println("onCompleted"); 
       } 

       @Override 
       public void onError(Throwable e) { 
        e.printStackTrace(); 
       } 
      }); 

    Observable.empty().delay(1, TimeUnit.MINUTES).toBlocking().subscribe(); 

あなたの加入者がIO上で動作することの方法()、スケジューラ、および背圧は動作するようです受け取った:11
受信:22

+0

私は試してみたいと思っています。ご協力ありがとうございました!また、bufferSizeを 'observeOn'に入れた場合、Subscriberで' request'を呼び出す必要はありません。バッファサイズの 'observeOn'メソッドは、内部でSubscriberの' request'メソッドを呼び出すようです。 – otal

関連する問題