を消費します。 のは、私がトリガーごとに1秒するタスク(tasks
)のリストがあるとしましょう。 scheduler
か何かで行うことができるは私が完了しようとしているタスクのための簡単な例に取り組んでいます異なる周波数
。すべてのタスクのすべてのn番目の完了時にトリガすべき
今、このストリームの2人の消費者、があるが、
C1
は、すべてのタスクC2
の完了時にトリガする必要があります。
(また、n秒ごとことができます)ここではいくつかのサンプルコードです。現在、それは繰り返す予定ではありません - Observable.repeat
またはScheduler
を使用する方が良いかどうかわからないためです。
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}
import scala.concurrent.duration._
object MainTest {
def main(args: Array[String]): Unit = {
def t = (i: Int) => Observable.eval {
print(i)
i
}
val tsks = (1 to 5).map(t)
val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))
val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))
val s = tasks.reduce(_ + _).publish
s.consumeWith(c1).runAsync
s.consumeWith(c2).runAsync
s.connect()
while (true) {
Thread.sleep(1.hour.toMillis)
}
}
}
私は今、はっきりと私の難しさを見ることができると思います。 1秒ごとに観測可能な状態で私の 'source'を' flatMap'すれば、このストリームはもはや有限ではありません。だから私は自分にとって重要な「サイクル」情報を完全に失ってしまった。'flatMap'の代わりに' map'を実行すると、それを保持しますが、Observable [Observable [Int]]を取得します。 そして、その流れを管理することはより困難です。 – Atais
上記のコメントは私がこの事件を理解するのを助けました。私のサイクルはObservableのアイテムでなければなりません。そしてBobは私の叔父です。 – Atais