2017-09-30 7 views
1

アクタは、WebSocketに接続するAkkaストリームを初期化します。これは、メッセージを送信できるSource.actorRefを使用して行われ、webSocketClientFlowによって処理され、Sink.foreachによって消費されます。これは、(akka docs由来)、次のコードで見ることができます。なぜPlayフレームワークはAkkaストリームを閉じませんか?

class TestActor @Inject()(implicit ec: ExecutionContext) extends Actor with ActorLogging { 

    final implicit val system: ActorSystem = ActorSystem() 
    final implicit val materializer: ActorMaterializer = ActorMaterializer() 

    def receive = { 
    case _ => 
    } 

    // Consume the incoming messages from the websocket. 
    val incoming: Sink[Message, Future[Done]] = 
    Sink.foreach[Message] { 
    case message: TextMessage.Strict => 
     println(message.text) 
    case misc => println(misc) 
    } 

    // Source through which we can send messages to the websocket. 
    val outgoing: Source[TextMessage, ActorRef] = 
    Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail) 

    // flow to use (note: not re-usable!) 
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws-feed.gdax.com")) 

    // Materialized the stream 
    val ((ws,upgradeResponse), closed) = 
    outgoing 
    .viaMat(webSocketFlow)(Keep.both) 
    .toMat(incoming)(Keep.both) // also keep the Future[Done] 
    .run() 

    // Check whether the server has accepted the websocket request. 
    val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
    } else { 
     throw new RuntimeException(s"Failed: ${upgrade.response.status}") 
    } 
    } 

    // When the connection has been established. 
    connected.onComplete(println) 

    // When the stream has closed 
    closed.onComplete { 
    case Success(_) => println("Test Websocket closed gracefully") 
    case Failure(e) => log.error("Test Websocket closed with an error\n", e) 
    } 

} 

プレイフレームワークは、それがTestActor閉じますが、アッカ・ストリームをクローズしません再コンパイルします。 websocketがタイムアウトしたときだけ、ストリームは閉じられます。これは私がSource.actorRefTestActorPostStop機能でPoisonPillで作成した俳優を送る、例えばによって手動でストリームをクローズする必要があること

を意味するのでしょうか?

:私もMaterializerを注入しようとしたとActorsystemすなわち:

@Inject()(implicit ec: ExecutionContext, implicit val mat: Materializer, implicit val system: ActorSystem) 

プレイを再コンパイルすると、ストリームが閉じられて、だけでなく、エラー生成されます。

[error] a.a.ActorSystemImpl - Websocket handler failed with 
Processor actor [Actor[akka://application/user/StreamSupervisor-62/flow-0-0-ignoreSink#989719582]] 
terminated abruptly 

答えて

1

Inを最初の例では、あなたの俳優の俳優システムを作成しています。あなたはそれをするべきではありません - 俳優システムは高価で、スレッドプールを開始すること、スケジューラを開始することなどを意味します。さらに、あなたはそれをシャットダウンすることはありません。つまり、シャットダウンしないストリームよりもはるかに大きな問題があります - リソースリークがあると、アクターシステムによって作成されたスレッドプールは決してシャットダウンされません。

WebSocket接続を受け取るたびに、新しいスレッドプールセットで新しいアクターシステムが作成され、決してそれらをシャットダウンすることはありません。本番環境では、わずかな負荷(1秒あたりの要求数)であっても、アプリケーションのメモリー不足は数分で解決されます。

一般的には、Playでは自分の俳優システムを作成することはできませんが、自分の俳優システムを作成することはできません。アクタ内からは、自動的に注入される必要もありません。context.systemは、アクタを作成したアクタシステムにアクセスします。マテリアライザーと同様に、これらは重い重量ではありませんが、接続ごとに1つずつ作成すると、シャットダウンしないとメモリが足りなくなる可能性があるため、注入する必要があります。

それを注入した場合、エラーが発生します。これは避けがたいものですが、不可能ではありません。難しいのは、Akka自体が、物事を正常に閉じるために物事をシャットダウンする必要がある順序を自動的に知ることができないということです。最初に俳優をシャットダウンして、正常にストリームをシャットダウンするか、彼らはシャットダウンしてそれに応じてあなたの俳優に通知することができますか?

アッカ2.5は、このためのソリューションを持っている、俳優のシステムは多少ランダムな順序で物事を殺す開始する前に、あなたは物事がシャットダウンするように登録することができ、管理シャットダウンシーケンス、:

https://doc.akka.io/docs/akka/2.5/scala/actors.html#coordinated-shutdown

あなたが使用することができますこれはAkkaストリームkill switchesと組み合わせて、残りのアプリケーションがシャットダウンする前にストリームを正常にシャットダウンします。

しかし、一般的に、シャットダウンのエラーはかなり良質なので、私の場合は私はそれらについて心配しません。

関連する問題