2017-07-07 3 views
1

私はAkka Streams 2.4.8をKafkaに対して使用し、At Once Once Delivery Semanticsでグラフを作成しようとしています。私のグラフのいくつかは、以下のようなものから始め:mapOneRecordToNOtherRecordsは、単一のメッセージを受け取り、で動作するように、別の種類のメッセージの任意の数「N」に変換しレコード乗数を持つグラフの配信セマンティクスで一番少なくは

committableSource ~> processing ~> mapOneRecordToNOtherRecords ~> ... 

。つまり、この段階で完了したら、各着信メッセージはN個のメッセージに掛け合わされ、それぞれが同じCommittableOffsetを持ちます。問題はALOSをサポートするために、最後の処理が完了するまでコミットするまで待つ必要があるということです。

Flow#mapConcatを使用して、mapOneRecordToNOtherRecordsの出力を処理し、ダウンストリームに進みます。 mapConcatからの出力は順序を保持していると私は理解しています。したがって、特定の相関識別子(たとえば、CommittableOffset)でN個のメッセージを収集し、すべてが収集されたときにそれらの最後を放出するグラフ要素が必要です。

メソッドFlow#groupByは、この目的のために興味深いようですが、シャットオフバルブがなく、キーを破棄する基準も表示されません。また、指定されたキーに対して発行する基準もありません。他のSOの投稿は、その使用が実際にメモリリークを引き起こす可能性があると私に信じさせました。

私はいくつかの他の解決策についてはドキュメントを覗き込んでいますが、私は何も見つけていません。誰かが私にそれを指摘できますか?確かにこれは共通の要件ですか?またはカスタムGraphStageを作成する必要がありますか?

答えて

0

答えがAkkaのドキュメントで見つからない場合は、GraphStageというカスタムを書きました。ここでは、

/** 
* Accumulates records in order based on a propertyExtractor function and a complete function. Preserves order of incoming such that if a group of 
* messages with property value P2 completes before a group with P1, and the first P1 was pulled before the first P2, they are not pushed until the 
* P1s complete. If this has not happened within timeout, will throw a TimeoutException 
* 
* @param propertyExtractor finds the property on the object being sent by which to track incoming 
* @param complete   establishes criteria by which the accumulated sequence of records may be emitted back to the stream 
* @param complete   (optional, default 10 seconds) duration over which stage will wait for an earlier value before throwing a 
*         TimeoutException 
* @tparam E     the type of the record 
* @tparam P     the type of the property acting as a key for the record 
*/ 
final class AccumulateWhileNotFull[E, P](propertyExtractor: E => P, bufferFull: (Seq[E]) => Boolean, timeout: Duration = 10 seconds) extends GraphStage[FlowShape[E, Seq[E]]] { 

    val in = Inlet[E]("AccumulateWhileUnchanged.in") 
    val out = Outlet[Seq[E]]("AccumulateWhileUnchanged.out") 

    override def shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { 

    private val buffers = TrieMap[P, scala.collection.mutable.Builder[E, Vector[E]]]() 
    private val ordering = mutable.LinkedHashSet[P]() 
    private val timing = TrieMap[P, DateTime]() 

    setHandlers(in, out, new InHandler with OutHandler { 

     override def onPush(): Unit = { 
     //add next element to the buffer it requires 
     val nextElement = grab(in) 
     val nextState = propertyExtractor(nextElement) 
     val bufferForNextState = buffers.getOrElseUpdate(nextState, Vector.newBuilder[E]) 
     bufferForNextState += nextElement 

     //track the next state allowed to be pushed 
     ordering += nextState 

     val allowedState = ordering.head 
     val firstAllowedAt = timing.getOrElseUpdate(allowedState, DateTime.now) 

     if (firstAllowedAt.isBefore(DateTime.now().minus(timeout.toMillis))) { 
      //reset timer in case exception is to be ignored 
      timing += allowedState -> DateTime.now 
      throw new TimeoutException(s"Failed to process expected number of messages for property ${allowedState} within ${timeout}") 
     } 

     if (nextState != allowedState) { 
      addToBuffersAndPull(nextState, bufferForNextState) 
      val bufferForAllowedState = buffers.getOrElseUpdate(allowedState, Vector.newBuilder[E]).result().toSeq 
      if (bufferFull(bufferForAllowedState)) { 
      removeFromBuffersAndPush(allowedState, bufferForAllowedState) 
      } 
     } 
     else { 
      //they are the same 
      val result = bufferForNextState.result().toSeq 
      if (bufferFull(result)) { 
      removeFromBuffersAndPush(allowedState, result) 
      } 
      else { 
      addToBuffersAndPull(nextState, bufferForNextState) 
      } 
     } 

     } 

     private def removeFromBuffersAndPush(state: P, result: Seq[E]) = { 
     buffers -= state 
     ordering -= state 
     timing -= state 
     push(out, result) 
     } 

     private def addToBuffersAndPull(state: P, builder: mutable.Builder[E, Vector[E]]) = { 
     buffers += state -> builder 
     pull(in) 
     } 

     override def onPull(): Unit = { 
     pull(in) 
     } 

     override def onUpstreamFinish(): Unit = { 
     var continuing = true 
     //go in order, first one not full shuts it all down 
     ordering.foreach { p => 
      val result = buffers(p).result() 
      continuing = continuing && bufferFull(result) 
      if (continuing) { 
      emit(out, result) 
      } 
     } 
     completeStage() 
     } 
    }) 

    override def postStop(): Unit = { 
     buffers.clear() 
    } 
    } 
} 

そして、それをバックアップするために、統合テストがされている

import java.util.concurrent.TimeoutException 

import akka.actor.ActorSystem 
import akka.stream.{ ActorMaterializer, Materializer } 
import akka.stream.scaladsl.{ Flow, Source } 
import akka.stream.testkit.scaladsl.TestSink 
import akka.testkit.TestKit 
import org.scalatest.{ FunSuiteLike, Matchers } 

import scala.concurrent.duration.Duration 
import scala.util.Try 
import scala.concurrent.duration._ 

class AccumulateWhileNotFullTest extends TestKit(ActorSystem("AccumulateWhileNotFullTest")) with FunSuiteLike with Matchers { 
    implicit lazy val materializer: Materializer = ActorMaterializer() 

    test("test works singly") { 
    var result = runBasic(List(Thing("a", 1, 1))) 
    result.requestNext() should be(Seq(Thing("a", 1, 1))) 
    Try(result.requestNext()).isFailure should be(true) 
    } 

    test("test works with never complete") { 
    var result = runBasic(List(Thing("a", 1, 2))) 
    Try(result.requestNext()).isFailure should be(true) 
    } 

    test("test resets keys") { 
    var result = runBasic(List(Thing("a", 1, 1), Thing("a", 1, 1))) 
    result.requestNext() should be(Seq(Thing("a", 1, 1))) 
    result.requestNext() should be(Seq(Thing("a", 1, 1))) 
    Try(result.requestNext()).isFailure should be(true) 
    } 

    test("test accumulates multiple in a row") { 
    var result = runBasic(List(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3))) 
    result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3))) 
    Try(result.requestNext()).isFailure should be(true) 
    } 

    test("test accumulates multiple in order") { 
    var result = runBasic(List(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3), Thing("b", 5, 2), Thing("b", 10, 2))) 
    result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3))) 
    result.requestNext() should be(Seq(Thing("b", 5, 2), Thing("b", 10, 2))) 
    Try(result.requestNext()).isFailure should be(true) 
    } 

    test("test handles out of order correctly") { 
    var result = runBasic(List(Thing("a", 1, 3), Thing("b", 5, 2), Thing("a", 5, 3), Thing("c", 5, 1), Thing("a", 10, 3), Thing("b", 10, 2))) 
    result.requestNext() should be(Seq(Thing("a", 1, 3), Thing("a", 5, 3), Thing("a", 10, 3))) 
    result.requestNext() should be(Seq(Thing("b", 5, 2), Thing("b", 10, 2))) 
    result.requestNext() should be(Seq(Thing("c", 5, 1))) 
    Try(result.requestNext()).isFailure should be(true) 
    } 

    test("test timeout throws exception") { 
    var result = runBasic(List(Thing("a", 1, 2)), 1 seconds) 
    val initalTry = Try(result.requestNext()) 
    initalTry.isFailure should be(true) 
    initalTry.recover { case ex: Throwable => ex.getMessage should be("expected OnNext, found OnComplete") } //test probe stock exception 
    Thread.sleep(1000) 
    Try(result.requestNext()).recover { case ex: TimeoutException => ex.getMessage should be(s"Failed to process expected number of messages for property a within 1 seconds") } 
    } 

    def runBasic(things: List[Thing], timeout: Duration = 10 seconds)(implicit actorSystem: ActorSystem, materializer: Materializer) = { 
    val f: Flow[Thing, Seq[Thing], _] = Flow.fromGraph(new AccumulateWhileNotFull[Thing, String](
     in => in.prop, 
     vals => vals.headOption.map { v => vals.size == v.count }.getOrElse(false), 
     timeout 
    )) 
    var result = List[Seq[Thing]]() 
    Source.fromIterator(() => things.toIterator).via(f).runWith(TestSink.probe(actorSystem)) 

    } 

    case class Thing(prop: String, value: Int, count: Int) 

} 
関連する問題