以下の解決策では、Source.actorRef
ステージでマテリアライズされたActorを終了することにより、サーバー側からの接続を切断できます。これは、単にPoisonPill
を送ることによって行われます。
接続時に「禁止された」クライアントを識別する方法はまだわかりません。そのため、例は非常に簡単です。クライアントが接続されています。他の戦略を使用してクライアントをいつでも抹消したい場合は、同じロジックを適用してPoisonPill
を自分のソースアクターに送ることができます。
object ChatApp extends App {
implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
val maximumClients = 1
class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id != uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}
val chatRef = system.actorOf(Props[ChatRef])
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))
}
コードから、実際には 'chatRef'が' ConnectionClosed'メッセージを受け取る( 'Sink.actorRef'のonCompleteメッセージとして)ように見えます。達成したいことを明確にすることができますか? –
私は手動でサーバー側からの接続を終了したいと思います。禁止されたユーザーがサーバーで閉じることを望む接続を開くたびに、「禁止されたユーザー」のリストがあると仮定します。 –