これにはakka-streamsとakka-httpを使用できます。ハンドラとして俳優を使った例:
package test
import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash, Status}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.{Flow, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.pattern.pipe
import scala.concurrent.{ExecutionContext, Future}
import scala.io.StdIn
object Test extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit def executionContext: ExecutionContext = actorSystem.dispatcher
val routes =
path("talk") {
get {
val handler = actorSystem.actorOf(Props[Handler])
val flow = Flow.fromSinkAndSource(
Flow[Message]
.filter(_.isText)
.mapAsync(4) {
case TextMessage.Strict(text) => Future.successful(text)
case TextMessage.Streamed(textStream) => textStream.runReduce(_ + _)
}
.to(Sink.actorRefWithAck[String](handler, Handler.Started, Handler.Ack, Handler.Completed)),
Source.queue[String](16, OverflowStrategy.backpressure)
.map(TextMessage.Strict)
.mapMaterializedValue { queue =>
handler ! Handler.OutputQueue(queue)
queue
}
)
handleWebSocketMessages(flow)
}
}
val bindingFuture = Http().bindAndHandle(routes, "localhost", 8080)
println("Started the server, press enter to shutdown")
StdIn.readLine()
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => actorSystem.terminate())
}
object Handler {
case object Started
case object Completed
case object Ack
case class OutputQueue(queue: SourceQueueWithComplete[String])
}
class Handler extends Actor with Stash {
import context.dispatcher
override def receive: Receive = initialReceive
def initialReceive: Receive = {
case Handler.Started =>
println("Client has connected, waiting for queue")
context.become(waitQueue)
sender() ! Handler.Ack
case Handler.OutputQueue(queue) =>
println("Queue received, waiting for client")
context.become(waitClient(queue))
}
def waitQueue: Receive = {
case Handler.OutputQueue(queue) =>
println("Queue received, starting")
context.become(running(queue))
unstashAll()
case _ =>
stash()
}
def waitClient(queue: SourceQueueWithComplete[String]): Receive = {
case Handler.Started =>
println("Client has connected, starting")
context.become(running(queue))
sender() ! Handler.Ack
unstashAll()
case _ =>
stash()
}
case class ResultWithSender(originalSender: ActorRef, result: QueueOfferResult)
def running(queue: SourceQueueWithComplete[String]): Receive = {
case s: String =>
// do whatever you want here with the received message
println(s"Received text: $s")
val originalSender = sender()
queue
.offer("some response to the client")
.map(ResultWithSender(originalSender, _))
.pipeTo(self)
case ResultWithSender(originalSender, result) =>
result match {
case QueueOfferResult.Enqueued => // okay
originalSender ! Handler.Ack
case QueueOfferResult.Dropped => // due to the OverflowStrategy.backpressure this should not happen
println("Could not send the response to the client")
originalSender ! Handler.Ack
case QueueOfferResult.Failure(e) =>
println(s"Could not send the response to the client: $e")
context.stop(self)
case QueueOfferResult.QueueClosed =>
println("Outgoing connection to the client has closed")
context.stop(self)
}
case Handler.Completed =>
println("Client has disconnected")
queue.complete()
context.stop(self)
case Status.Failure(e) =>
println(s"Client connection has failed: $e")
e.printStackTrace()
queue.fail(new RuntimeException("Upstream has failed", e))
context.stop(self)
}
}
あり微調整することができ、ここで多くの場所がありますが、基本的な考え方は同じまま。あるいは、メソッドで必要とされるFlow[Message, Message, _]
をGraphStage
を使用して実装することもできます。上記で使用されているすべてのことについては、akka-streamsドキュメントで詳しく説明しています。