2017-11-20 5 views
1

私は、sqsから読み込み、別のシステムに書き込んだり、sqsから削除するグラフを持っています。 SQSから削除するために、私は、HTTPの場合SqsMessageオブジェクトAkkaストリームはサードパーティのフロー/ステージの戻り値の型を返します

にレシートハンドルを必要とするが、流れの署名は、私は流れの下流放射されますタイプを言うことができます流れる

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool] 

この場合、私はTをSqsMessageに設定することができ、私はまだ必要なすべてのデータを持っています。

ただし、グーグルクラウドパブサブコネクタなどのコネクタの一部は、完全に役に立たない(私にとって)パブサブIDを発信します。

pubサブフローの下流私はpubサブフローの前にあったsqsメッセージIDにアクセスできる必要があります。パブサブコネクタ

を書き換えることなく、これを回避するための最良の方法は何

私は概念的には少しこのような何かしたい:あなたはパススルー統合パターンを使用することができます

Flow[SqsMessage] //i have my data at this point 
within(
.map(toPubSubMessage) 
.via(pubSub)) 

... from here i have the same type i had before within however it still behaves like a linear graph with back pressure etc 

答えて

1

を。 akka-streams-kafkaに使用方法の表情の一例として - >クラスakka.kafka.scaladsl.Producer - > Mehtod DEF F low[K, V, PassThrough]

だから​​

package my.package 

import java.util.concurrent.atomic.AtomicInteger 

import scala.concurrent.Future 
import scala.util.{Failure, Success, Try} 

import akka.stream._ 
import akka.stream.ActorAttributes.SupervisionStrategy 
import akka.stream.stage._ 

final case class Message[V, PassThrough](record: V, passThrough: PassThrough) 

final case class Result[R, PassThrough](result: R, message: PassThrough) 

class PathThroughStage[R, V, PassThrough] 
    extends GraphStage[FlowShape[Message[V, PassThrough], Future[Result[R, PassThrough]]]] { 

    private val in = Inlet[Message[V, PassThrough]]("messages") 
    private val out = Outlet[Result[R, PassThrough]]("result") 
    override val shape = FlowShape(in, out) 

    override protected def createLogic(inheritedAttributes: Attributes) = { 
    val logic = new GraphStageLogic(shape) with StageLogging { 
     lazy val decider = inheritedAttributes.get[SupervisionStrategy] 
     .map(_.decider) 
     .getOrElse(Supervision.stoppingDecider) 
     val awaitingConfirmation = new AtomicInteger(0) 
     @volatile var inIsClosed = false 

     var completionState: Option[Try[Unit]] = None 

     override protected def logSource: Class[_] = classOf[PathThroughStage[R, V, PassThrough]] 

     def checkForCompletion() = { 
     if (isClosed(in) && awaitingConfirmation.get == 0) { 
      completionState match { 
      case Some(Success(_)) => completeStage() 
      case Some(Failure(ex)) => failStage(ex) 
      case None => failStage(new IllegalStateException("Stage completed, but there is no info about status")) 
      } 
     } 
     } 

     val checkForCompletionCB = getAsyncCallback[Unit] { _ => 
     checkForCompletion() 
     } 

     val failStageCb = getAsyncCallback[Throwable] { ex => 
     failStage(ex) 
     } 

     setHandler(out, new OutHandler { 
     override def onPull() = { 
      tryPull(in) 
     } 
     }) 

     setHandler(in, new InHandler { 
     override def onPush() = { 
      val msg = grab(in) 
      val f = Future[Result[R, PassThrough]] { 
      try { 
       Result(// TODO YOUR logic 
       msg.record, 
       msg.passThrough) 
      } catch { 
       case exception: Exception => 
       decider(exception) match { 
        case Supervision.Stop => 
        failStageCb.invoke(exception) 
        case _ => 
        Result(exception, msg.passThrough) 
       } 
      } 

      if (awaitingConfirmation.decrementAndGet() == 0 && inIsClosed) checkForCompletionCB.invoke(()) 
      } 
      awaitingConfirmation.incrementAndGet() 
      push(out, f) 
     } 

     override def onUpstreamFinish() = { 
      inIsClosed = true 
      completionState = Some(Success(())) 
      checkForCompletion() 
     } 

     override def onUpstreamFailure(ex: Throwable) = { 
      inIsClosed = true 
      completionState = Some(Failure(ex)) 
      checkForCompletion() 
     } 
     }) 

     override def postStop() = { 
     log.debug("Stage completed") 
     super.postStop() 
     } 
    } 
    logic 
    } 
} 
PassThrough要素、例を使用して独自のステージを実装
関連する問題