2017-11-03 4 views
0

を消費します。 のは、私がトリガーごとに1秒するタスク(tasks)のリストがあるとしましょう。 schedulerか何かで行うことができるは私が完了しようとしているタスクのための簡単な例に取り組んでいます異なる周波数

。すべてのタスクのすべてのn番目の完了時にトリガすべき

今、このストリームの2人の消費者、があるが、

  • C1は、すべてのタスク
  • C2の完了時にトリガする必要があります。

(また、n秒ごとことができます)ここではいくつかのサンプルコードです。現在、それは繰り返す予定ではありません - Observable.repeatまたはSchedulerを使用する方が良いかどうかわからないためです。

import monix.eval.Task 
import monix.execution.Scheduler.Implicits.global 
import monix.execution.Scheduler.{global => scheduler} 
import monix.reactive.{Consumer, Observable} 

import scala.concurrent.duration._ 

object MainTest { 

    def main(args: Array[String]): Unit = { 

    def t = (i: Int) => Observable.eval { 
     print(i) 
     i 
    } 

    val tsks = (1 to 5).map(t) 

    val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println(""))) 

    val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]")) 
    val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]")) 

    val s = tasks.reduce(_ + _).publish 

    s.consumeWith(c1).runAsync 
    s.consumeWith(c2).runAsync 
    s.connect() 

    while (true) { 
     Thread.sleep(1.hour.toMillis) 
    } 
    } 

} 

答えて

2

まず第一に、1秒ごとに作業を繰り返すため、あなたはすべてのタスクの完了時にトリガするために...

Observable.intervalAtFixedRate(1.second) 
    .flatMap(_ => Observable.eval(???)) 

を行うことができ、あなたがしたい場合、あなたはどちらかcompletedを(使用することができます唯一の最後の完了イベントを発するObservable[Nothing])またはcompletedL(あなたが代わりにTask[Unit]で作業する場合)。 詳細については、API docsを参照してください。だからではなく、あなたのc1事の

、あなたが行うことができます:

  • sample(別名throttleLast
  • sampleRepeated
  • s.completeL.runAsync 
    

    をしかし、ソースをサンプリングするために、あなたが作業することができます

  • throttleFirst
  • debounce
  • debounceRepeated
  • echo
  • echoRepeated

私は、これらの、API docsで始めてプレイすることをお勧めします。

s.sample(10.seconds).doOnNext(println).completedL.runAsync 

それとも、単にtakeEveryNthを持つすべてのN番目の要素を取ることができます。

s.takeEveryNth(20).doOnNext(println).completedL.runAsync 

が、これは、あなたの質問に答えるなら、私に教えてください。

+0

私は今、はっきりと私の難しさを見ることができると思います。 1秒ごとに観測可能な状態で私の 'source'を' flatMap'すれば、このストリームはもはや有限ではありません。だから私は自分にとって重要な「サイクル」情報を完全に失ってしまった。'flatMap'の代わりに' map'を実行すると、それを保持しますが、Observable [Observable [Int]]を取得します。 そして、その流れを管理することはより困難です。 – Atais

+0

上記のコメントは私がこの事件を理解するのを助けました。私のサイクルはObservableのアイテムでなければなりません。そしてBobは私の叔父です。 – Atais

関連する問題