2016-04-10 14 views
1

私は、単にストリームをテレビに投げるストリーミングTwitterクライアントを作成しています。私はRxJavaでストリームを観察しています。RxJavaイベントのバーストを円滑にするために観察可能

ストリームがバースト的になったとき、私はそれをバッファして遅くして、各ツイートが少なくとも6秒間表示されるようにします。その後、静かな時間の間に、ビルドされたバッファは、キューの先頭を6秒ごとに1つずつつかむことによって、徐々に空になります。新しいツイートが入ってきて、空のキューに直面している場合(ただし最後が表示されてから> 6秒後)、すぐに表示されるようにします。

は、私はそのように見ているストリームがhereを説明した想像:

Raw:  --oooo--------------ooooo-----oo----------------ooo| 
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o| 

そして、私は疑問が解決策が持ってポーズをとったことを理解しています。しかし、私はちょうどその答えの周りに私の頭を包むことはできません。ここに私の解決策があります:

私の質問です。これはアプローチがあまりにも単純ですか?バッファリング/バックプレッシャーはどこで起こっていますか?より良い解決策はありますか?

答えて

2

メッセージが以前のメッセージに比べてあまりにも早すぎると、メッセージを遅延させたいように見えます。あなたはそれの後に、新たな発光が最後の目標発光時間を追跡し、スケジュールする必要があります。

public class SpanOutV2 { 
    public static void main(String[] args) { 
     Observable<Integer> source = Observable.just(0, 5, 13) 
       .concatMapEager(v -> Observable.just(v).delay(v, TimeUnit.SECONDS)); 

     long minSpan = 6; 
     TimeUnit unit = TimeUnit.SECONDS; 
     Scheduler scheduler = Schedulers.computation(); 

     long minSpanMillis = TimeUnit.MILLISECONDS.convert(minSpan, unit); 

     Observable.defer(() -> { 
      AtomicLong lastEmission = new AtomicLong(); 

      return source 
      .concatMapEager(v -> { 
       long now = scheduler.now(); 
       long emission = lastEmission.get(); 

       if (emission + minSpanMillis > now) { 
        lastEmission.set(emission + minSpanMillis); 
        return Observable.just(v).delay(emission + minSpanMillis - now, TimeUnit.MILLISECONDS); 
       } 
       lastEmission.set(now); 
       return Observable.just(v); 
      }); 
     }) 
     .timeInterval() 
     .toBlocking() 
     .subscribe(System.out::println); 
    } 
} 

ここで、ソースは、問題の開始からの相対秒数だけ遅れています。 0はすぐに到着し、5はT = 6秒に到着し、13は@ T = 13に到着するはずである。concatMapEagerは、順序とタイミングが保持されることを保証する。標準的なオペレータだけが使用されているので、バックプレッシャおよびアンサブスクリプションは自然に構成されます。

+0

はい!これは間違いなく私の問題を解決するように見えます。好奇心の念から、(あなたのやったように)メッセージの後に(私がやったように)遅れてメッセージを遅らせることに利点がありますか?両方のソリューションがうまくいくと思います。たぶん私は理解していない微妙なRxの違いがあります。 – dgmltn

+0

オペレータの関与が少なく、オーバーヘッドが少ない。 – akarnokd