2017-08-10 13 views
1

AkkaからStreamsテストキットを使用しようとすると、私は奇妙な動作をしています。Akka Streamsテストキットを使用すると散発的タイムアウトエラーが発生する

私は次は、カスタム定義されていません。

trait PauseFilter[T] { 

    def shouldPause(message: T): Boolean 
} 

trait MessagePauser[T] { 

    def pause(message: T): Unit 
} 

trait MessageUnPauser[T] { 

    def unPause: Option[T] 
} 

object PausableFlow { 

    def pausableFlow[T](filter: PauseFilter[T], pauser: MessagePauser[T], unpauser: MessageUnPauser[T]): Flow[T, T, _] = 

    Flow.fromGraph(
     GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
     import GraphDSL.Implicits._ 

     val initialMessagesIn = builder.add(Flow[T]) 
     val finalMessagesOut = builder.add(Flow[T]) 
     val unPauserIterator =() => Iterator.continually(unpauser.unPause) 
     val unpausedMessages = Source.fromIterator(unPauserIterator).collect { case Some(m) => m } 
     val pausedMessagesSink = Sink.foreach(pauser.pause) 
     val determineIfPaused = builder.add(Partition[T](2, message => if (filter.shouldPause(message)) 1 else 0)) 
     val merge = builder.add(Merge[T](2)) 

     unpausedMessages ~> merge 
     initialMessagesIn ~> merge ~> determineIfPaused 
               determineIfPaused.out(1) ~> pausedMessagesSink 
               determineIfPaused.out(0) ~> finalMessagesOut 

     FlowShape(initialMessagesIn.in, finalMessagesOut.out) 
     } 
    ) 
} 

それに加えて、私は流れの仕様で、以下の健全性チェックテストを定義している:何が起こっている

test("a pauser that pauses half the messages should propagate only half the messages and pause the other half") { 
    val filter = new PauseFilter[Int] { 
    override def shouldPause(m: Int): Boolean = if (m % 2 == 0) true else false 
    } 

    val unpauser = mock[MessageUnPauser[Int]] 
    Mockito.when(unpauser.unPause).thenReturn(None) 

    val pauser = mock[MessagePauser[Int]] 

    val (pub, sub) = getTestHandles(PausableFlow.pausableFlow[Int](filter, pauser, unpauser)) 

    sub.request(200) 
    1 to 100 foreach pub.sendNext 
    1 to 100 filter (m => m % 2 == 1) foreach sub.expectNext 
    1 to 100 filter (m => m % 2 == 0) foreach Mockito.verify(pauser, Mockito.times(1)).pause 
} 

private def getTestHandles[T](flow: Flow[T, T, _]) = 
    TestSource.probe[T] 
    .via(flow) 
    .toMat(TestSink.probe[T])(Keep.both) 
    .run() 

は、ということです次の例外を除いて散発的なエラーが発生します。

assertion failed: timeout (3 seconds) during expectMsg: 
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: 
    at scala.Predef$.assert(Predef.scala:170) 
    at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:405) 
    at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814) 
    at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:716) 
    at akka.stream.testkit.TestPublisher$Probe.sendNext(StreamTestKit.scala:173) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70) 
    at scala.collection.immutable.Range.foreach(Range.scala:160) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply$mcV$sp(PausableFlowSpec.scala:70) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57) 
    at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57) 
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) 
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) 
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) 
    at org.scalatest.Transformer.apply(Transformer.scala:22) 
    at org.scalatest.Transformer.apply(Transformer.scala:20) 
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) 
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122) 
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) 
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) 
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) 
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) 
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) 
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) 
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1555) 
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) 
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) 
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) 
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) 
    at scala.collection.immutable.List.foreach(List.scala:392) 
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) 
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) 
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) 
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) 
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) 
    at org.scalatest.Suite$class.run(Suite.scala:1424) 
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) 
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) 
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) 
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545) 
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) 
    at com.company.stream.pausable.PausableFlowSpec.org$scalatest$BeforeAndAfterAll$$super$run(PausableFlowSpec.scala:16) 
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) 
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) 
    at com.company.stream.pausable.PausableFlowSpec.run(PausableFlowSpec.scala:16) 
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) 
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) 
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) 
    at scala.collection.immutable.List.foreach(List.scala:392) 
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) 
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) 
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) 
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) 
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) 
    at org.scalatest.tools.Runner$.run(Runner.scala:883) 
    at org.scalatest.tools.Runner.run(Runner.scala) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138) 
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) 

実際のコードに例外を結びつけることができるように、あなたに参照点を与えるためにコードアウトしてください。例外は、1 to 100 foreach pub.sendNextが呼び出されたときにスローされます。このような行動を引き起こすために時間がかかることがないので、なぜこのようなことが起こるのか、私は非常に困惑しています。多分私はここで何かを逃しているでしょう。あなたの意見を得るのは素晴らしいことでしょう。

答えて

0

問題はunPausedMessagesにあります。これはストリームを強制終了します。この通話は、 val unpausedMessages = Source.tick(FiniteDuration(0, "ms"), FiniteDuration(100, "ms"), unpauser.unPause) .collect { case Some(m) => m }または同等のものとすることを検討してください。

他に起こることは、多くのアイテムを消費せずにストリームにプッシュしようとすることです。 コンシューマに同時に実行するFuture内で1 to 100 foreach pub.sendNextを実行することを検討してください。

関連する問題