2017-11-09 17 views
1

私は以下のグラフを持っています。Akka Streams - FlowShapeをジャンプする

inflateFlowステージでは、すでにDBで処理されているリクエストがあるかどうかを確認します。すでに処理されたメッセージがある場合は、MsgSuccessを返信し、RequestProcessは返信しませんが、次のFlowShapeは受け付けません。RequestProcessが必要です。どこでもEitherを追加することなくflowInflateからflowWrapにジャンプする方法はありますか?

GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 

    val flowInflate = builder.add(wrapFunctionInFlowShape[MsgRequest, RequestProcess](inflateFlow)) 
    val flowProcess = builder.add(wrapFunctionInFlowShape[RequestProcess, SuccessProcess](convertFlow)) 
    val flowWrite = builder.add(wrapFunctionInFlowShape[SuccessProcess, SuccessProcess](writeFlow)) 
    val flowWrap = builder.add(wrapFunctionInFlowShape[SuccessProcess, MsgSuccess](wrapFlow)) 

    flowInflate ~> flowProcess ~> flowWrite ~> flowWrap 

    FlowShape(flowInflate.in, flowWrap.out) 
} 

def wrapFunctionInFlowShape[Input, Output](f: Input => Output): Flow[Input, Output, NotUsed] = { 
    Flow.fromFunction { input => 
    f(input) 
    } 
} 

//check for cache 
def inflateFlow(msgRequest: MsgRequest): Either[RequestProcess, MsgSuccess] = { 
    val hash: String = hashMethod(msgRequest) 
    if(existisInDataBase(hash)) 
    Right(MsgSuccess(hash)) 
    else 
    Left(inflate(msgRequest)) 
} 

def convertFlow(requestPorocess: RequestPocess): SuccessProcess = {}//process the request} 
def writeFlow(successProcess: SuccessProcess): SuccessProcess = {}//write to DB} 
def wrapFlow(successProcess: SuccessProcess): MsgSuccess = {}//wrap and return the message} 

答えて

1

代替パスをパーティション内のストリームに定義できます。あなたの場合、Akka Stream ContribプロジェクトのPartitionWithステージが役に立ちます。標準のAkka Streams APIのPartitionステージとは異なり、PartitionWithは出力タイプを異ならせることができます:あなたのケースでは、出力タイプはRequestProcessMsgSuccessです。

まず、PartitionWithを使用し、あなたのbuild.sbtに次の依存関係を追加するには:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.8" 

第二に、パーティションでinflateFlowを置き換える:

def split = PartitionWith[MsgRequest, RequestProcess, MsgSuccess] { msgRequest => 
    val hash = hashMethod(msgRequest) 
    if (!existisInDataBase(hash)) 
    Left(inflate(msgRequest)) 
    else 
    Right(MsgSuccess(hash)) 
} 

を次に、あなたのグラフにその舞台を組み込む:

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 

    val pw = builder.add(split) 
    val flowProcess = builder.add(wrapFunctionInFlowShape[RequestProcess, SuccessProcess](convertFlow)) 
    val flowWrite = builder.add(wrapFunctionInFlowShape[SuccessProcess, SuccessProcess](writeFlow)) 
    val flowWrap = builder.add(wrapFunctionInFlowShape[SuccessProcess, MsgSuccess](wrapFlow)) 

    val mrg = builder.add(Merge[MsgSuccess](2)) 

    pw.out0 ~> flowProcess ~> flowWrite ~> flowWrap ~> mrg.in(0) 
    pw.out1 ~> mrg.in(1) 

    FlowShape(pw.in, mrg.out) 
}) 

iデータベース内にMsgRequestという名前が見つかりませんでした。RequestProcessに変換されると、そのメッセージは元のフローパスを通過します。入力されたMsgRequestがデータベースにあり、MsgSuccessに解決された場合、フローの中間ステップをバイパスします。どちらの場合も、結果として得られるメッセージは、2つの代替パスから1つのフロー・アウトレットにマージされます。

関連する問題