AkkaからStreamsテストキットを使用しようとすると、私は奇妙な動作をしています。Akka Streamsテストキットを使用すると散発的タイムアウトエラーが発生する
私は次は、カスタム定義されていません。
trait PauseFilter[T] {
def shouldPause(message: T): Boolean
}
trait MessagePauser[T] {
def pause(message: T): Unit
}
trait MessageUnPauser[T] {
def unPause: Option[T]
}
object PausableFlow {
def pausableFlow[T](filter: PauseFilter[T], pauser: MessagePauser[T], unpauser: MessageUnPauser[T]): Flow[T, T, _] =
Flow.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val initialMessagesIn = builder.add(Flow[T])
val finalMessagesOut = builder.add(Flow[T])
val unPauserIterator =() => Iterator.continually(unpauser.unPause)
val unpausedMessages = Source.fromIterator(unPauserIterator).collect { case Some(m) => m }
val pausedMessagesSink = Sink.foreach(pauser.pause)
val determineIfPaused = builder.add(Partition[T](2, message => if (filter.shouldPause(message)) 1 else 0))
val merge = builder.add(Merge[T](2))
unpausedMessages ~> merge
initialMessagesIn ~> merge ~> determineIfPaused
determineIfPaused.out(1) ~> pausedMessagesSink
determineIfPaused.out(0) ~> finalMessagesOut
FlowShape(initialMessagesIn.in, finalMessagesOut.out)
}
)
}
それに加えて、私は流れの仕様で、以下の健全性チェックテストを定義している:何が起こっている
test("a pauser that pauses half the messages should propagate only half the messages and pause the other half") {
val filter = new PauseFilter[Int] {
override def shouldPause(m: Int): Boolean = if (m % 2 == 0) true else false
}
val unpauser = mock[MessageUnPauser[Int]]
Mockito.when(unpauser.unPause).thenReturn(None)
val pauser = mock[MessagePauser[Int]]
val (pub, sub) = getTestHandles(PausableFlow.pausableFlow[Int](filter, pauser, unpauser))
sub.request(200)
1 to 100 foreach pub.sendNext
1 to 100 filter (m => m % 2 == 1) foreach sub.expectNext
1 to 100 filter (m => m % 2 == 0) foreach Mockito.verify(pauser, Mockito.times(1)).pause
}
private def getTestHandles[T](flow: Flow[T, T, _]) =
TestSource.probe[T]
.via(flow)
.toMat(TestSink.probe[T])(Keep.both)
.run()
は、ということです次の例外を除いて散発的なエラーが発生します。
assertion failed: timeout (3 seconds) during expectMsg:
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:405)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814)
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:716)
at akka.stream.testkit.TestPublisher$Probe.sendNext(StreamTestKit.scala:173)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3$$anonfun$apply$mcV$sp$5.apply(PausableFlowSpec.scala:70)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply$mcV$sp(PausableFlowSpec.scala:70)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57)
at com.company.stream.pausable.PausableFlowSpec$$anonfun$3.apply(PausableFlowSpec.scala:57)
at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
at org.scalatest.Suite$class.run(Suite.scala:1424)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
at com.company.stream.pausable.PausableFlowSpec.org$scalatest$BeforeAndAfterAll$$super$run(PausableFlowSpec.scala:16)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
at com.company.stream.pausable.PausableFlowSpec.run(PausableFlowSpec.scala:16)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
at org.scalatest.tools.Runner$.run(Runner.scala:883)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
実際のコードに例外を結びつけることができるように、あなたに参照点を与えるためにコードアウトしてください。例外は、1 to 100 foreach pub.sendNext
が呼び出されたときにスローされます。このような行動を引き起こすために時間がかかることがないので、なぜこのようなことが起こるのか、私は非常に困惑しています。多分私はここで何かを逃しているでしょう。あなたの意見を得るのは素晴らしいことでしょう。