それはあなたの
sink2: Msg2(from source2)
sink1: Msg1(from source1 (actor))
sink2: Msg2(from source1 (actor))
flow: forward msg Msg3(from source1 (actor)) to Msg2(from the flow)
sink2: Msg2(from the flow)
は、それが誰かの役に立てば幸い、次の出力が得られます
class Msg
case class Msg1(string: String) extends Msg
case class Msg2(string: String) extends Msg
case class Msg3(string: String) extends Msg
def optionFilter[T]: Flow[Option[T], T, NotUsed] = {
import akka.stream.scaladsl.GraphDSL.Implicits._
val graph = GraphDSL.create() { implicit builder =>
{
def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition: UniformFanOutShape[Option[T], Option[T]] =
builder.add(Partition[Option[T]](2, partitioner))
val flow = builder.add(Merge[T](1))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> flow
partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> Sink.ignore
FlowShape(partition.in, flow.out)
}
}
Flow.fromGraph(graph)
}
val msg1Filter = optionFilter[Msg1]
def router(
source1: Source[Msg, ActorRef],
source2: Source[Msg, _],
sink1: Sink[Msg1, _],
sink2: Sink[Msg2, _],
flow: Flow[Msg3, Msg, _]
): ActorRef = {
import akka.stream.scaladsl.GraphDSL.Implicits._
val graph: Graph[SinkShape[Msg], NotUsed] = GraphDSL.create() { implicit builder =>
val unzipper: UnzipWith3[Msg, Option[Msg1], Option[Msg2], Option[Msg3]] =
UnzipWith { msg: Msg =>
msg match {
case msg1: Msg1 => (Some(msg1), None, None)
case msg2: Msg2 => (None, Some(msg2), None)
case msg3: Msg3 => (None, None, Some(msg3))
}
}
val merge = builder.add(Merge[Msg](3))
val forward = builder.add(Merge[Msg](1))
val unzip = builder.add(unzipper)
source2 ~> merge ~> unzip.in
forward.out ~> merge
unzip.out0 ~> optionFilter[Msg1] ~> sink1
unzip.out1 ~> optionFilter[Msg2] ~> sink2
unzip.out2 ~> optionFilter[Msg3] ~> flow ~> merge
SinkShape(forward.in(0))
}
val sink: Sink[Msg, NotUsed] = Sink.fromGraph(graph)
sink.runWith(source1)
}
val source1 = Source.actorRef(4096, OverflowStrategy.fail)
val source2 = Source(List(Msg2("from source2")))
val sink1: Sink[Msg1, Future[Done]] = Sink.foreach((msg: Msg1) => println(s"sink1: $msg"))
val sink2: Sink[Msg2, Future[Done]] = Sink.foreach((msg: Msg2) => println(s"sink2: $msg"))
val flow = Flow.fromFunction((msg: Msg3) => {
val msg2 = Msg2("from the flow")
println(s"flow: forward msg $msg to $msg2")
msg2
})
val actor = router(source1, source2, sink1, sink2, flow)
actor ! Msg1("from source1 (actor)")
actor ! Msg2("from source1 (actor)")
actor ! Msg3("from source1 (actor)")
に誰を助けることができる場合、私はついにそれが...ここで、私のコードのサンプルですしまいました将来は !