アクタは、WebSocketに接続するAkkaストリームを初期化します。これは、メッセージを送信できるSource.actorRef
を使用して行われ、webSocketClientFlow
によって処理され、Sink.foreach
によって消費されます。これは、(akka docs由来)、次のコードで見ることができます。なぜPlayフレームワークはAkkaストリームを閉じませんか?
class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging {
final implicit val system: ActorSystem = ActorSystem()
final implicit val materializer: ActorMaterializer = ActorMaterializer()
def receive = {
case _ =>
}
// Consume the incoming messages from the websocket.
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
case misc => println(misc)
}
// Source through which we can send messages to the websocket.
val outgoing: Source[TextMessage, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com"))
// Materialized the stream
val ((ws,upgradeResponse), closed) =
outgoing
.viaMat(webSocketFlow)(Keep.both)
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// Check whether the server has accepted the websocket request.
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Failed: ${upgrade.response.status}")
}
}
// When the connection has been established.
connected.onComplete(println)
// When the stream has closed
closed.onComplete {
case Success(_) => println("Test Websocket closed gracefully")
case Failure(e) => log.error("Test Websocket closed with an error\n", e)
}
}
プレイフレームワークは、それがTestActor閉じますが、アッカ・ストリームをクローズしません再コンパイルします。 websocketがタイムアウトしたときだけ、ストリームは閉じられます。これは私がSource.actorRef
TestActorPostStop
機能でPoisonPill
で作成した俳優を送る、例えばによって手動でストリームをクローズする必要があること
を意味するのでしょうか?
注:私もMaterializer
を注入しようとしたとActorsystem
すなわち:
@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem)
プレイを再コンパイルすると、ストリームが閉じられて、だけでなく、エラー生成されます。
[error] a.a.ActorSystemImpl - Websocket handler failed with
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]]
terminated abruptly