2017-12-17 30 views
0

私はlocalhostのwebsocketを通していくつかのサーバに接続しようとしています。Akka-http:localhostのwebsocketに接続

ws = new WebSocket('ws://localhost:8137'); 

でJSでやろうとすると成功します。しかし、akka-httpとakka-streamsを使用すると、「接続に失敗しました」というエラーが表示されます。

object Transmitter { 
    implicit val system: ActorSystem = ActorSystem() 
    implicit val materializer: ActorMaterializer = ActorMaterializer() 

    import system.dispatcher 

    object Rec extends Actor { 
     override def receive: Receive = { 
      case TextMessage.Strict(msg) => 
       Log.info("Recevied signal " + msg) 
     } 
    } 

    // val host = "ws://echo.websocket.org" 
    val host = "ws://localhost:8137" 

    val sink: Sink[Message, NotUsed] = Sink.actorRef[Message](system.actorOf(Props(Rec)), PoisonPill) 


    val source: Source[Message, NotUsed] = Source(List("test1", "test2") map (TextMessage(_))) 


    val flow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = 
     Http().webSocketClientFlow(WebSocketRequest(host)) 

    val (upgradeResponse, closed) = 
     source 
     .viaMat(flow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] 
     .toMat(sink)(Keep.both) // also keep the Future[Done] 
     .run() 

    val connected: Future[Done.type] = upgradeResponse.flatMap { upgrade => 
     if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
      Future.successful(Done) 
     } else { 
      Future.failed(new Exception(s"Connection failed: ${upgrade.response.status}") 
     } 
    } 

    def test(): Unit = { 
     connected.onComplete(Log.info) 
    } 
} 

ws://echo.websocket.orgで完全に問題なく動作します。

私はJavaScriptクライアントで動作し、問題は接続のみであるため、私のサーバーのコードを添付するのは無意味だと思いますが、それを見たい場合は表示してください。

私は間違っていますか?

答えて

0

私が使用したサーバーはIPv6(as :: 1)で動作していましたが、akka-httpはlocalhostを127.0.0.1として扱い、:: 1を無視します。私はそれがIPv4を使用するように強制するサーバーを書き直さなければなりませんでした。

1

akka documentation、 からwebsocketサーバーでクライアントの実装をテストしましたが、接続エラーは発生しませんでした。 websocketクライアントが正常に接続されます。だからこそ私はあなたのサーバーの実装に問題があると推測しています。

object WebSocketServer extends App { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    import Directives._ 

    val greeterWebSocketService = Flow[Message].collect { 
    case tm: TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream) 
    } 

    val route = 
    get { 
     handleWebSocketMessages(greeterWebSocketService) 
    } 

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8137) 

    println(s"Server online at http://localhost:8137/\nPress RETURN to stop...") 
    StdIn.readLine() 

    import system.dispatcher // for the future transformations 
    bindingFuture 
    .flatMap(_.unbind()) // trigger unbinding from the port 
    .onComplete(_ => system.terminate()) // and shutdown when done 
} 

ところで、あなたの俳優の受信方法がすべての可能なメッセージをカバーしていないことに気付きました。 that akka issueによれば、すべてのメッセージは、たとえ非常に小さい場合でも、Streamedとして終了する可能性があります。あなたはすべてのテキストメッセージを印刷したい場合は、俳優の優れた実装では、次のようになります。

object Rec extends Actor { 
    override def receive: Receive = { 
    case TextMessage.Strict(text)    ⇒ println(s"Received signal $text") 
    case TextMessage.Streamed(textStream)  ⇒ textStream.runFold("")(_ + _).foreach(msg => println(s"Received streamed signal: $msg")) 
    } 
} 

my github上の作業プロジェクトを見つけてください。

+0

ご協力いただきありがとうございます。それは私の質問に正確に答えなかったにもかかわらず、私はあなたのサーバーを起動すると、もう1つはまだ実行されていたことに気付きました。はい、正確なポートです。 1つは127.0.0.1で実行され、もう1つは:: 1で実行されたということでした。 – Ikciwor

+0

あなたの例では、着信ストリームメッセージをそのように実現することはブロックされていませんか? – kyle

関連する問題