2017-08-21 11 views
2

私はデータを出力するobservableを持っています。私は最初に3秒間それをバッファし、最初のバッファの後に1秒のスライドがなければなりません。これは、buffer(timespan,unit,skip)のようになります。スキップはタイムスパンにあります。RxJava Sliding Window

サンプル:

ObservableData,TimeStamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2) 

ExpectedList : {5,10,30},{10,30,40,60},{30,40,60,90} 

私はカスタムオペレータを作成することによって、これを達成することができます。私はカスタムオペレータに頼ることなくそれを行う方法があることを知りたいだけです。

答えて

0

私は組み込み演算子で解決できると思います。ホットまたは非軽量冷たいソースに使用する場合、物事はトリッキー得ることができますが、コードは、以下の方法のいずれかを示しています - 私は、教育/取得-アイデアのためにそれを使用することをお勧めではなく、生産使用を

@Test 
fun slidingWindow() { 
    val events = Flowable.just(
      Data(5, 1.0), 
      Data(10, 1.5), 
      Data(30, 2.8), 
      Data(40, 3.2), 
      Data(60, 3.8), 
      Data(90, 4.2)) 
      .observeOn(Schedulers.io()) 
    val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet() 
    Assert.assertNotNull(windows) 
    Assert.assertFalse(windows.isEmpty()) 
} 

fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
     from = 0, 
     to = windowSize, 
     slideSize = slideSize, 
     data = data) 

fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> { 
    val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data } 
    val tail = data.skipWhile { it.time <= from + slideSize } 
    val nonEmptyWindow = window.toList().filter { !it.isEmpty() } 
    val nextWindow = nonEmptyWindow.flatMapPublisher { 
     window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io()) 
    } 
    return nonEmptyWindow.toFlowable().concatWith(nextWindow) 
} 

data class Data(val data: Int, val time: Double) 

上記のテスト結果は
[[5, 10, 30], [10, 30, 40, 60], [30, 40, 60, 90], [40, 60, 90], [90]]

+0

私の場合には望ましい解決に到着するのに役立ちます – Sagar