私は以下のグラフを持っています。Akka Streams - FlowShapeをジャンプする
inflateFlow
ステージでは、すでにDBで処理されているリクエストがあるかどうかを確認します。すでに処理されたメッセージがある場合は、MsgSuccess
を返信し、RequestProcess
は返信しませんが、次のFlowShape
は受け付けません。RequestProcess
が必要です。どこでもEither
を追加することなくflowInflate
からflowWrap
にジャンプする方法はありますか?
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val flowInflate = builder.add(wrapFunctionInFlowShape[MsgRequest, RequestProcess](inflateFlow))
val flowProcess = builder.add(wrapFunctionInFlowShape[RequestProcess, SuccessProcess](convertFlow))
val flowWrite = builder.add(wrapFunctionInFlowShape[SuccessProcess, SuccessProcess](writeFlow))
val flowWrap = builder.add(wrapFunctionInFlowShape[SuccessProcess, MsgSuccess](wrapFlow))
flowInflate ~> flowProcess ~> flowWrite ~> flowWrap
FlowShape(flowInflate.in, flowWrap.out)
}
def wrapFunctionInFlowShape[Input, Output](f: Input => Output): Flow[Input, Output, NotUsed] = {
Flow.fromFunction { input =>
f(input)
}
}
//check for cache
def inflateFlow(msgRequest: MsgRequest): Either[RequestProcess, MsgSuccess] = {
val hash: String = hashMethod(msgRequest)
if(existisInDataBase(hash))
Right(MsgSuccess(hash))
else
Left(inflate(msgRequest))
}
def convertFlow(requestPorocess: RequestPocess): SuccessProcess = {}//process the request}
def writeFlow(successProcess: SuccessProcess): SuccessProcess = {}//write to DB}
def wrapFlow(successProcess: SuccessProcess): MsgSuccess = {}//wrap and return the message}