2016-10-31 14 views
2

私はクライアントサイドのwebsocketを、webSocketClientFlowの文書に従っています。Akka-HTTPクライアントwebsocketの使い方

サンプルコードは次のとおりです。

import akka.actor.ActorSystem 
import akka.Done 
import akka.http.scaladsl.Http 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl._ 
import akka.http.scaladsl.model._ 
import akka.http.scaladsl.model.ws._ 

import scala.concurrent.Future 

object WebSocketClientFlow { 
    def main(args: Array[String]) = { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    import system.dispatcher 

    // Future[Done] is the materialized value of Sink.foreach, 
    // emitted when the stream completes 
    val incoming: Sink[Message, Future[Done]] = 
     Sink.foreach[Message] { 
     case message: TextMessage.Strict => 
      println(message.text) 
     } 

    // send this as a message over the WebSocket 
    val outgoing = Source.single(TextMessage("hello world!")) 

    // flow to use (note: not re-usable!) 
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) 

    // the materialized value is a tuple with 
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that 
    // completes or fails when the connection succeeds or fails 
    // and closed is a Future[Done] with the stream completion from the incoming sink 
    val (upgradeResponse, closed) = 
     outgoing 
     .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] 
     .toMat(incoming)(Keep.both) // also keep the Future[Done] 
     .run() 

    // just like a regular http request we can access response status which is available via upgrade.response.status 
    // status code 101 (Switching Protocols) indicates that server support WebSockets 
    val connected = upgradeResponse.flatMap { upgrade => 
     if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
     } else { 
     throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
     } 
    } 

    // in a real application you would not side effect here 
    connected.onComplete(println) 
    closed.foreach(_ => println("closed")) 
    } 
} 

後は、サーバー側のWebSocketのためにconnection送信メッセージを使用する方法、接続をアップグレードしていましたか?

私は、docから気づい:

をこのメソッドによって返される流れは一度だけ実現することができます。要求ごとに、メソッドを再度呼び出すことによって新しいフローを取得する必要があります。

まだ混乱していますが、アップグレードされた接続が準備完了してから何度もフローを構築する必要があるのはなぜですか。

+1

申し訳ありませんが、あなたが求めているものを実際に明確ではありません。それぞれのストリームにメッセージをプッシュすることによって、Webソケット接続を介してメッセージを送信します。特定のケースでは、1つのメッセージ( 'TextMessage(" hello world! ")')のみがサーバーに送信され、その後に出力ストリームは閉じられます。 ( 'Sink.foreach'から作成された)着信ストリームは、サーバがストリームを閉じるまで、または受信タイムアウトが始まるまで、メッセージを受信し続けます。必要ならば、適切な' Source'を構築する必要があります。より複雑な方法で送信するものを制御します。 –

+0

@VladimirMatveev思い出してくれてありがとう。私はakka-httpがどのように動作しているのか誤解しており、ドキュメントを深く読んでいると思います。どうもありがとうございます! – xring

答えて

4

アクターベースのソースを作成し、確立されたwebsocket接続を介して新しいメッセージを送信することができます。

val req = WebSocketRequest(uri = "ws://127.0.0.1/ws") 
    val webSocketFlow = Http().webSocketClientFlow(req) 

    val messageSource: Source[Message, ActorRef] = 
     Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail) 

    val messageSink: Sink[Message, NotUsed] = 
     Flow[Message] 
      .map(message => println(s"Received text message: [$message]")) 
      .to(Sink.ignore) 

    val ((ws, upgradeResponse), closed) = 
     messageSource 
      .viaMat(webSocketFlow)(Keep.both) 
      .toMat(messageSink)(Keep.both) 
      .run() 

    val connected = upgradeResponse.flatMap { upgrade => 
     if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
      Future.successful(Done) 
     } else { 
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
     } 
    } 

    ws ! TextMessage.Strict("Hello World") 
    ws ! TextMessage.Strict("Hi") 
    ws ! TextMessage.Strict("Yay!") 

`

関連する問題