2016-12-24 13 views
2

ここに私のwebsocketサーバーの実装があります。Akka websocket - サーバー接続を閉じる方法は?

val route = get { 
    pathEndOrSingleSlash { 
    handleWebSocketMessages(websocketFlow) 
    } 
} 

def websocketFlow: Flow[Message, Message, Any] = 
    Flow[Message] 
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) } 
    .via(chatActorFlow(UUID.randomUUID())) 
    .map(event => TextMessage.Strict(protocol.serialize(event))) 


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = { 

    val sink = Flow[Protocol.Message] 
    .map(msg => Protocol.SignedMessage(connectionId, msg)) 
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId))) 

    val source = Source 
     .mapMaterializedValue { 
     actor : ActorRef => { 
      chatRef ! Protocol.OpenConnection(actor, connectionId) 
     } 
     } 

    Flow.fromSinkAndSource(sink, source) 
} 

タイプConnectionClosedのメッセージがchatRefによって送信された後、接続を閉じるにはどのような方法がある場合、私は思ったんだけど?

+0

コードから、実際には 'chatRef'が' ConnectionClosed'メッセージを受け取る( 'Sink.actorRef'のonCompleteメッセージとして)ように見えます。達成したいことを明確にすることができますか? –

+0

私は手動でサーバー側からの接続を終了したいと思います。禁止されたユーザーがサーバーで閉じることを望む接続を開くたびに、「禁止されたユーザー」のリストがあると仮定します。 –

答えて

4

以下の解決策では、Source.actorRefステージでマテリアライズされたActorを終了することにより、サーバー側からの接続を切断できます。これは、単にPoisonPillを送ることによって行われます。

接続時に「禁止された」クライアントを識別する方法はまだわかりません。そのため、例は非常に簡単です。クライアントが接続されています。他の戦略を使用してクライアントをいつでも抹消したい場合は、同じロジックを適用してPoisonPillを自分のソースアクターに送ることができます。

object ChatApp extends App { 

    implicit val system = ActorSystem("chat") 
    implicit val executor: ExecutionContextExecutor = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    val route = get { 
    pathEndOrSingleSlash { 
     handleWebSocketMessages(websocketFlow) 
    } 
    } 

    val maximumClients = 1 

    class ChatRef extends Actor { 
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef]) 

    def withClients(clients: Map[UUID, ActorRef]): Receive = { 
     case SignedMessage(uuid, msg) => clients.collect{ 
     case (id, ar) if id != uuid => ar ! msg 
     } 
     case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill 
     case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar))) 
     case CloseConnection(uuid) => context.become(withClients(clients - uuid)) 
    } 
    } 

    object Protocol { 
    case class SignedMessage(uuid: UUID, msg: String) 
    case class OpenConnection(actor: ActorRef, uuid: UUID) 
    case class CloseConnection(uuid: UUID) 
    } 

    val chatRef = system.actorOf(Props[ChatRef]) 

    def websocketFlow: Flow[Message, Message, Any] = 
    Flow[Message] 
     .mapAsync(1) { 
     case TextMessage.Strict(s) => Future.successful(s) 
     case TextMessage.Streamed(s) => s.runFold("")(_ + _) 
     case b: BinaryMessage => throw new Exception("Binary message cannot be handled") 
     }.via(chatActorFlow(UUID.randomUUID())) 
     .map(TextMessage(_)) 

    def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = { 

    val sink = Flow[String] 
     .map(msg => Protocol.SignedMessage(connectionId, msg)) 
     .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId))) 

    val source = Source.actorRef(16, OverflowStrategy.fail) 
     .mapMaterializedValue { 
     actor : ActorRef => { 
      chatRef ! Protocol.OpenConnection(actor, connectionId) 
     } 
     } 

    Flow.fromSinkAndSource(sink, source) 
    } 

    Http().bindAndHandle(route, "0.0.0.0", 8080) 
    .map(_ => println(s"Started server...")) 

} 
+0

ねえ、これは私が探していた答えです!私の場合、クライアントは接続が開いてから30秒以内に自分自身を認証しなければなりません。そうでない場合はサーバが接続を切断します。 'PoisonPill'は仕事を優しくしてくれてありがとう! –

関連する問題