2017-11-18 6 views
1

いくつかのオブジェクト(同じParentオブジェクトを拡張している)を放出する2つの別々のストリームをマージして、バッファオペレータを使用してバッファリングし、10秒後に収集したデータを放出することをシミュレートできます。このマージ/バッファは、2つの分離されたストリームからのエミッションがあるたびに常に呼び出されるように、このメカニズムを無限にしたいと考えています。ここで 2つのInfiite Observableストリームをシミュレートする方法と、それらをマージして10秒ごとにバッファリングする他のObservableを持つ方法はありますか?

は、私がこれまで何をやったかである:

val list1 = mutableListOf<SomeClass1>(
      SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3), 
      SomeClass1("4", 4), SomeClass1("5", 5), SomeClass1("6", 6), 
      SomeClass1("7", 7), SomeClass1("8", 8), SomeClass1("9", 9) 
    ) 
    val list2 = mutableListOf<SomeClass2>(
      SomeClass2(1.00), SomeClass2(2.00), SomeClass2(3.00), 
      SomeClass2(4.00), SomeClass2(5.00), SomeClass2(6.00), 
      SomeClass2(7.00), SomeClass2(8.00), SomeClass2(9.00) 
    ) 

    val someClass1Observable = Observable 
      .fromIterable(list1) 
      .zipWith(Observable.interval(2, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass1, _: Long -> item }) 


    val someClass2Observable = Observable 
      .fromIterable(list2) 
      .zipWith(Observable.interval(2, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass2, _: Long -> item }) 


    someClass1Observable.subscribe { 
     Log.v("someClass1", it.toString()) 
    } 

    someClass2Observable.subscribe { 
     Log.v("someClass2", it.toString()) 
    } 


    Observable.merge(someClass1Observable, someClass2Observable) 
      .buffer(10, TimeUnit.SECONDS) 
      .repeat() 
      .doOnSubscribe { Log.v("parentObservable", "STARTED") } 
      .subscribe { t: MutableList<Parent> -> 
       Log.v("parentObservable", "onNext") 
       t.forEach { Log.v("onNext", it.toString()) } 
      } 

    Thread.sleep(30000) 
    Log.v("AFTER_SLEEP", "AFTER_SLEEP") 


    someClass1Observable.subscribe { 
     Log.v("someClass1", it.toString()) 
    } 

    someClass2Observable.subscribe { 
     Log.v("someClass2", it.toString()) 
    } 

2つのストリームの第一の発光が正常に動作しますが、観察可能なマージ/バッファは10秒後に、そこから毎回排出量を集めています。しかし、それらの2つのストリームが排出を終了し、それらを再び購読すると、Observableのバッファー/マージはロガーの働きがありません。この作品を無限のようにいかに持つか?リストから値を読み取る必要がないオブジェクトを放出する2つの別々のストリームのコードを書く良い方法はありますか?代わりに、2秒間隔ごとに新しいオブジェクトを放出するでしょうか? マージ/バッファを作る方法Observable worksは無限で、2つの観測可能なストリームから新しいエミッションがあるときはいつでもそれを意味しますか?

答えて

0

それらSomeClassは無限ストリーム、あなたは、単にそれらを使用する前に、それらに.repeat()オペレータを置くことができる、またはあなたがintervalからの要求に応じてオブジェクトを構築することができようにするには:

val someClass1Obs = Observable 
    .interval(2, TimeUnit.SECONDS) 
    .map { SomeClass1("$it", it.toInt()) } // <-- create objects on demand 

私はこれがあなたの他の問題を解決するかもしれないと思いますあまりにも。

関連する問題