2017-09-02 25 views
1

反応的なx(理想的にはRxJavaまたはRxJの例を使用)はこれを達成できますか?有限ストリームの無限ストリームを無限ストリームに変換する - 反応的なX

a |-a-------------------a-----------a-----------a---- 
s1 |-x-x-x-x-x-x -| (subscribe) 
s2      |-x-x-x-x-x-| (subscribe) 
s2            |-x-x-x-x-x-| (subscribe) 
... 
sn 
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe) 

a(加算動作を実行するために)各snストリームにしかしで購読することができながら、無限ストリームSの一部であるべき各々がイベントの有限ストリームsnをトリガするイベントの無限ストリームであります同じ時間にストリームSを無限に保ちます。

編集:より具体的には、私がKotlinで探しているものの実装を提供します。 4イベントの共有有限ストリームにマップするイベントが10秒ごとに発行されます。メタストリームはflatMap - 通常の無限ストリームに変換されます。私はdoAfterNextを利用して、それぞれの有限ストリームに加入して結果をプリントアウトします。

/** Creates a finite stream with events 
* $ch-1 - $ch-4 
*/ 
fun createFinite(ch: Char): Observable<String> = 
     Observable.interval(1, TimeUnit.SECONDS) 
       .take(4) 
       .map({ "$ch-$it" }).share() 

fun main(args: Array<String>) { 

    var ch = 'A' 

    Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
      .map { createFinite(ch++) } 
      .doAfterNext { 
       it 
         .count() 
         .subscribe({ c -> println("I am done. Total event count is $c") }) 
      } 
      .flatMap { it } 
      .subscribe { println("Just received [$it] from the infinite stream ") } 

    // Let main thread wait forever 
    CountDownLatch(1).await() 
} 

しかし、これが「純粋なRX」であるかどうかはわかりません。

+1

これは 'concatMap'のように見えますが、どのように各イベントをN個の内部ソースのセットにマップするのかは不明です。 – akarnokd

+1

これまでに試したことの例を追加すると、達成しようとしていることがわかります。 – paulpdaniels

+0

http://i0.kym-cdn.com/photos/images/original/000/173/576/Wat8.jpg - 私はタイトルを読んでいます – inf

答えて

0

どのようにカウントしたいのか分かりません。あなたはトータルカウントを行っている場合は、内部サブスクリプション行う必要はありません。一方

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it } 
     .doOnNext(counter.incrementAndget()) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

は、各中間観察できるのカウントを提供する必要がある場合、あなたは内部のカウントを移動することができますflatMap()カウントをプリントアウトし、完了時にそれをリセット:

AtomicLong counter = new AtomicLong() 
Observable.interval(10, TimeUnit.SECONDS).startWith(0) 
     .map { createFinite(ch++) } 
     .flatMap { it 
        .doOnNext(counter.incrementAndget() 
        .doOnCompleted({ long ctr = counter.getAndSet(0) 
             println("I am done. Total event count is $ctr") 
            }) 
     .subscribe { println("Just received [$it] from the infinite stream ") } 

これは非常に機能的ではありませんが、報告のこの種は、通常のストリームを分割する傾向があります。

関連する問題