2017-06-23 1 views
0

私はAkkaストリームでいくつかのグラフを作成しようとしています。基本的には、カフカサーバーとTCPサーバーとの間でメッセージをルーティングする必要があります。難しい部分は、私が時々TCPサーバーに答えるために発生します。KafkaとTCPの間のAkkaストリームゲートウェイ

  • ケース1:TCP - >カフカ(OK)

  • ケース2:カフカ - > TCP(OK)

  • ケース3:TCP - > TCP

  • ケース4: - > TCP

カルシウム私はいくつかのメッセージを受け取ったときに発生し、それをKafkaに公開する前にサーバーにいくつかの精度を求めなければならなかった。

ケース4が発生し、最初のハンドシェイクを送信してTCP接続を開く。

私は、フィードバックループ考えるSource.actoRefファンアウトが、まだそれを構築することはできません。

これは主に私が考えたグラフである:

          +------------------+ 
              |     | 
              | TCP msg   | 
              | (Source.actorRef)| 
              |     | 
              +----+-------------+ 
               | 
               | 
+------------+  +-----------------+ +----v----+  +--------------+ 
|   +------->     +---->   +------>Kafka (Sink) | 
| TCP  |  | TLS   | | Router |  +--------------+ 
| (flow) |  | (bi-dir flow) | | (???) |  +--------------+ 
|   <-------+     <----+   <------+Kafka (Source)| 
+------------+  +-----------------+ +---------+  +--------------+ 

私はGraphStageでいくつかのパーソナライズされたグラフを構築する必要があると思いますが、いくつかのreferenciesを欠場します。特にRouterの場合、3つの入力を受け入れ、2つの異なる出力に応答を転送できなければなりません。

手がかりがある場合は、私はそれを愛するでしょう。

ありがとうございます!

答えて

0

それはあなたの

sink2: Msg2(from source2) 
sink1: Msg1(from source1 (actor)) 
sink2: Msg2(from source1 (actor)) 
flow: forward msg Msg3(from source1 (actor)) to Msg2(from the flow) 
sink2: Msg2(from the flow) 

は、それが誰かの役に立てば幸い、次の出力が得られます

class Msg 
    case class Msg1(string: String) extends Msg 
    case class Msg2(string: String) extends Msg 
    case class Msg3(string: String) extends Msg 

def optionFilter[T]: Flow[Option[T], T, NotUsed] = { 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

val graph = GraphDSL.create() { implicit builder => 
    { 
    def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1) 
    val partition: UniformFanOutShape[Option[T], Option[T]] = 
     builder.add(Partition[Option[T]](2, partitioner)) 

    val flow = builder.add(Merge[T](1)) 

    partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> flow 
    partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> Sink.ignore 

    FlowShape(partition.in, flow.out) 
    } 
    } 
    Flow.fromGraph(graph) 
    } 

val msg1Filter = optionFilter[Msg1] 

def router(
    source1: Source[Msg, ActorRef], 
    source2: Source[Msg, _], 
    sink1: Sink[Msg1, _], 
    sink2: Sink[Msg2, _], 
    flow: Flow[Msg3, Msg, _] 
): ActorRef = { 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

    val graph: Graph[SinkShape[Msg], NotUsed] = GraphDSL.create() { implicit builder => 
    val unzipper: UnzipWith3[Msg, Option[Msg1], Option[Msg2], Option[Msg3]] = 
    UnzipWith { msg: Msg => 
     msg match { 
     case msg1: Msg1 => (Some(msg1), None, None) 
     case msg2: Msg2 => (None, Some(msg2), None) 
     case msg3: Msg3 => (None, None, Some(msg3)) 
     } 
    } 

    val merge = builder.add(Merge[Msg](3)) 
    val forward = builder.add(Merge[Msg](1)) 

    val unzip = builder.add(unzipper) 

    source2 ~> merge ~> unzip.in 
    forward.out ~> merge 

    unzip.out0 ~> optionFilter[Msg1] ~> sink1 
    unzip.out1 ~> optionFilter[Msg2] ~> sink2 
    unzip.out2 ~> optionFilter[Msg3] ~> flow ~> merge 

    SinkShape(forward.in(0)) 
} 

val sink: Sink[Msg, NotUsed] = Sink.fromGraph(graph) 

sink.runWith(source1) 
} 

val source1 = Source.actorRef(4096, OverflowStrategy.fail) 
val source2 = Source(List(Msg2("from source2"))) 

val sink1: Sink[Msg1, Future[Done]] = Sink.foreach((msg: Msg1) => println(s"sink1: $msg")) 
val sink2: Sink[Msg2, Future[Done]] = Sink.foreach((msg: Msg2) => println(s"sink2: $msg")) 

val flow = Flow.fromFunction((msg: Msg3) => { 
    val msg2 = Msg2("from the flow") 
    println(s"flow: forward msg $msg to $msg2") 
    msg2 
}) 

val actor = router(source1, source2, sink1, sink2, flow) 

actor ! Msg1("from source1 (actor)") 
actor ! Msg2("from source1 (actor)") 
actor ! Msg3("from source1 (actor)") 

に誰を助けることができる場合、私はついにそれが...ここで、私のコードのサンプルですしまいました将来は !

関連する問題