2017-08-02 9 views
0

私はakka-streamのライフサイクルを監視したいと思っています。monitorは私の必要とするものですが、私の監視機能は非同期で、Futureを返すので、モニターが必要ですasyncも同様です。akkaストリームのライフサイクルを監視する

monitorは、次のシグネチャがあります。

def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Mat2): ReprMat[Out, Mat2] 

をしかし、私はのような何か必要があります:

def monitorAsync[Mat2]()(combine: (Mat, FlowMonitor[Out]) ⇒ Future[Mat2]): ReprMat[Out, Mat2] 

mapAsyncのように、この使用アッカ・ストリームのプリミティブを実装する方法はあります。

私はmapAsync + watchTerminationを使用することができますが、monitorがほぼ必要なときには複雑な解決策のようです。

+1

我々は、「フラット・マッピング」のオープンチケットを持っている未来はあなたがここに従うことができる値をマテリアライズド:https://github.com/akka/akka/issues/ 23303 – johanandren

答えて

0

monitorは、ストリームのマテリアライゼーション後にはFlowMonitorにしかアクセスできないため、私が望むものではありませんでした。

mapAsyncrecoverでこれを実装しました。私はここに簡素化していますが、このような何か:

val monitor = new Monitor { 
    def onNext: Future[Unit] = ??? 
    def onFailure(cause: Throwable): Future[Unit] = ??? 
    def onFinish: Future[Unit] = ??? 
} 

source.mapAsync { v => 
    monitor.onNext.map(_ => v) 
}.watchTermination() { (mat, doneF) => 
    doneF.flatMap(_ => monitor.onFinish).recoverWith(case ex => monitor.onFailure(ex)) 
} 
関連する問題