2017-05-17 4 views
5

以下に示すような非常に長い実行ストリームのストリームがあるとします。長い時間が過ぎると、不要になった多くのサブストリームが作成されます。連続したAkkaストリームでサブストリームをクリーンアップする方法

ID 3で作成されたサブを清掃する必要があり 例と13Pmで失わスキャン方式での状態 のために、与えられた時間に特定のサブをクリーンアップする方法はあります(WIDのプロパティを有効期限が切れます) ?

case class Wid(id: Int, v: String, expires: LocalDateTime) 
test("Substream with scan") { 
    val (pub, sub) = TestSource.probe[Wid] 
    .groupBy(Int.MaxValue, _.id) 
    .scan("")((a: String, b: Wid) => a + b.v) 
    .mergeSubstreams 
    .toMat(TestSink.probe[String])(Keep.both) 
    .run() 
} 

答えて

3

TL; DRあなたはいくつかの時間後にサブを閉じることができます。しかし、入力を使用してステージを組み込んで時間を動的に設定することは別の問題です。流れを閉じるにはサブ

を閉じる

、あなたは通常、(上流から)それを完了しますが、(下流から)それをキャンセルすることができます。たとえば、n要素が通過すると、take(n: Int)フローがキャンセルされます。

ここで、groupByの場合、すべてのサブストリームで共有されているのでサブストリームを完了できませんが、キャンセルすることができます。あなたはどのような状態にしたいのかによって決まります。

しかし、groupByが既にクローズされたサブフローのための入力を削除することに注意してください:ID 3持つ新しい要素が3 -substreamが閉じられた後groupBy、それは単に無視され、次には、上流から来る場合要素が引き込まれる理由は、おそらく、サブストリームの終了と再開の間のプロセスでいくつかの要素が失われる可能性があるからです。また、ストリームが非常に長い時間実行されることになっている場合、関連する(ライブ)サブストリームに転送される前に、各要素が閉じられたサブストリームのリストに対してチェックされるため、パフォーマンスに影響します。あなたがこれのパフォーマンスに満足していない場合は、独自のステートフルフィルタ(例えば、ブルームフィルタを使用)を実装したいかもしれません。

サブストリームを閉じるには、通常は、指定した数の要素だけが必要ですが、無限ストリームではそうでない場合があります)、または何らかの種類のタイムアウトを使用します。固定の場合はcompletionTimeoutマテリアライゼーションから閉鎖に至るまでの時間、またはしばらくの間エレメントが通っていないときにクローズしたい場合はidleTimeoutです。これらのフローはストリームをキャンセルしませんが失敗するので、recoverまたはrecoverWithステージで例外をキャッチして、エラーをキャンセルに変更する必要があります(recoverWith最後の要素を送信せずにキャンセルできるように、Source.emptyでリカバリする) 。動的にタイムアウトに

を設定

今、あなたが望むものを最初に通過要素に応じて動的に終了時間を設定することです。これは、ストリームの実体化がそれらを通過する要素とは独立しているため、より複雑です。実際、通常の場合(groupByなし)、ストリームはいずれかの要素がそれらを通過する前に実現されるため、要素を使用してそれらを実現するのは意味がありません。

私はthat questionで同様の問題があったが、それを定義したキーを使用して、すべてのサブを定義することができます署名

paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM]) 

groupByの修正版を使用して終了。これは、パラメータとしてキーの代わりに最初の要素を持つように変更できます。

別の(おそらくもっと簡単です)方法は、あなたが望むものを自分のステージで作成することです:最初の要素から終了時刻を取得し、その時刻にストリームをキャンセルします。ここではこれの実装例を示します(状態を設定する代わりにスケジューラを使用しました)。

object CancelAfterTimer 

class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] { 
    val in = Inlet[T]("CancelAfter.in") 
    val out = Outlet[T]("CancelAfter.in") 
    override val shape: FlowShape[T, T] = FlowShape(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { 
    override def onPush(): Unit = { 
     val elem = grab(in) 
     if (!isTimerActive(CancelAfterTimer)) 
     scheduleOnce(CancelAfterTimer, getTimeout(elem)) 
     push(out, elem) 
    } 

    override def onTimer(timerKey: Any): Unit = 
     completeStage() //this will cancel the upstream and close the downstrean 

    override def onPull(): Unit = pull(in) 

    setHandlers(in, out, this) 
    } 
} 
関連する問題