2017-03-10 10 views
2

ウェブサイトのボタンをクリックしてコマンドを表し、websocket経由でプログラムにそのコマンドを発行し、プログラムを処理させたいコマンド(副作用を生成する)を作成し、そのコマンドの結果をレンダリングするWebサイトに返します。Akka Streamsの副作用を使用してwebsocketから受け取ったコマンドを実装する

websocketは、ユーザービュー内の異なるアクタによって適用される状態の変更を更新する責任があります。

例:ウェブサイトからAI命令を変更しています。これによっていくつかの値が変更され、ウェブサイトに報告されます。他のユーザが他のAI命令を変更したり、AIが現在の状況の変化に反応して、クライアントが画面を更新する必要が生じたりすることがあります。

私は、変更された情報でクライアントを更新する責任を負う俳優がいて、受信ストリームで状態を変更で更新できると思っていましたか?

これは適切なライブラリですか?私が欲しいものを達成するためのよりよい方法はありますか?

答えて

1

これにはakka-streamsとakka-httpを使用できます。ハンドラとして俳優を使った例:

package test 

import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash, Status} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.{Message, TextMessage} 
import akka.http.scaladsl.server.Directives._ 
import akka.stream.scaladsl.{Flow, Sink, Source, SourceQueueWithComplete} 
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} 
import akka.pattern.pipe 

import scala.concurrent.{ExecutionContext, Future} 
import scala.io.StdIn 

object Test extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit def executionContext: ExecutionContext = actorSystem.dispatcher 

    val routes = 
    path("talk") { 
     get { 
     val handler = actorSystem.actorOf(Props[Handler]) 
     val flow = Flow.fromSinkAndSource(
      Flow[Message] 
      .filter(_.isText) 
      .mapAsync(4) { 
       case TextMessage.Strict(text) => Future.successful(text) 
       case TextMessage.Streamed(textStream) => textStream.runReduce(_ + _) 
      } 
      .to(Sink.actorRefWithAck[String](handler, Handler.Started, Handler.Ack, Handler.Completed)), 
      Source.queue[String](16, OverflowStrategy.backpressure) 
      .map(TextMessage.Strict) 
      .mapMaterializedValue { queue => 
       handler ! Handler.OutputQueue(queue) 
       queue 
      } 
     ) 
     handleWebSocketMessages(flow) 
     } 
    } 

    val bindingFuture = Http().bindAndHandle(routes, "localhost", 8080) 

    println("Started the server, press enter to shutdown") 
    StdIn.readLine() 

    bindingFuture 
    .flatMap(_.unbind()) 
    .onComplete(_ => actorSystem.terminate()) 
} 

object Handler { 
    case object Started 
    case object Completed 
    case object Ack 
    case class OutputQueue(queue: SourceQueueWithComplete[String]) 
} 

class Handler extends Actor with Stash { 
    import context.dispatcher 

    override def receive: Receive = initialReceive 

    def initialReceive: Receive = { 
    case Handler.Started => 
     println("Client has connected, waiting for queue") 
     context.become(waitQueue) 
     sender() ! Handler.Ack 

    case Handler.OutputQueue(queue) => 
     println("Queue received, waiting for client") 
     context.become(waitClient(queue)) 
    } 

    def waitQueue: Receive = { 
    case Handler.OutputQueue(queue) => 
     println("Queue received, starting") 
     context.become(running(queue)) 
     unstashAll() 

    case _ => 
     stash() 
    } 

    def waitClient(queue: SourceQueueWithComplete[String]): Receive = { 
    case Handler.Started => 
     println("Client has connected, starting") 
     context.become(running(queue)) 
     sender() ! Handler.Ack 
     unstashAll() 

    case _ => 
     stash() 
    } 

    case class ResultWithSender(originalSender: ActorRef, result: QueueOfferResult) 

    def running(queue: SourceQueueWithComplete[String]): Receive = { 
    case s: String => 
     // do whatever you want here with the received message 
     println(s"Received text: $s") 

     val originalSender = sender() 
     queue 
     .offer("some response to the client") 
     .map(ResultWithSender(originalSender, _)) 
     .pipeTo(self) 

    case ResultWithSender(originalSender, result) => 
     result match { 
     case QueueOfferResult.Enqueued => // okay 
      originalSender ! Handler.Ack 
     case QueueOfferResult.Dropped => // due to the OverflowStrategy.backpressure this should not happen 
      println("Could not send the response to the client") 
      originalSender ! Handler.Ack 
     case QueueOfferResult.Failure(e) => 
      println(s"Could not send the response to the client: $e") 
      context.stop(self) 
     case QueueOfferResult.QueueClosed => 
      println("Outgoing connection to the client has closed") 
      context.stop(self) 
     } 

    case Handler.Completed => 
     println("Client has disconnected") 
     queue.complete() 
     context.stop(self) 

    case Status.Failure(e) => 
     println(s"Client connection has failed: $e") 
     e.printStackTrace() 
     queue.fail(new RuntimeException("Upstream has failed", e)) 
     context.stop(self) 
    } 
} 

あり微調整することができ、ここで多くの場所がありますが、基本的な考え方は同じまま。あるいは、メソッドで必要とされるFlow[Message, Message, _]GraphStageを使用して実装することもできます。上記で使用されているすべてのことについては、akka-streamsドキュメントで詳しく説明しています。

関連する問題