2017-08-23 7 views
0

私はスカラの周りで遊び始めて、スカラーのウェブソケットチャットルームのこの特定の定型会に来ました。Scala&Play Websockets:交換されたメッセージを保存する

メッセージは、接続されたすべてのクライアントに送信するために、ソースとシンクとしてMessageHub.source()BroadcastHub.sink()を使用します。

この例は、そのままメッセージをやりとりするために問題なく動作しています。

private val (chatSink, chatSource) = { 
    // Don't log MergeHub$ProducerFailed as error if the client disconnects. 
    // recoverWithRetries -1 is essentially "recoverWith" 
    val source = MergeHub.source[WSMessage] 
    .log("source") 
    .recoverWithRetries(-1, { case _: Exception ⇒ Source.empty }) 

    val sink = BroadcastHub.sink[WSMessage] 
    source.toMat(sink)(Keep.both).run() 
} 

private val userFlow: Flow[WSMessage, WSMessage, _] = { 
Flow.fromSinkAndSource(chatSink, chatSource) 
} 

def chat(): WebSocket = { 
    WebSocket.acceptOrResult[WSMessage, WSMessage] { 
    case rh if sameOriginCheck(rh) => 
     Future.successful(userFlow).map { flow => 
     Right(flow) 
     }.recover { 
     case e: Exception => 
      val msg = "Cannot create websocket" 
      logger.error(msg, e) 
      val result = InternalServerError(msg) 
      Left(result) 
     } 

    case rejected => 
     logger.error(s"Request ${rejected} failed same origin check") 
     Future.successful { 
     Left(Forbidden("forbidden")) 
     } 
    } 
} 

私はチャットルームで交換されたメッセージをDBに保存します。

マップとフォールド機能をソースとシンクに追加して、送信されたメッセージを取得しようとしましたが、できませんでした。

Iは

val flow = Flow[WSMessage].map(element => println(s"Message: $element")) 
source.via(flow).toMat(sink)(Keep.both).run() 

以下のようMergeHubとBroadcastHub間を流れる段を追加しようとしたが、それはそのような署名でtoMatを参照することができないコンパイルエラーをスロー。

送信されたメッセージを取得してDBに保存するには、どうすればよいか教えてください。完全なテンプレートの

リンク:

https://github.com/playframework/play-scala-chatroom-example

+0

追加してください。フローステージをどのように追加しようとしたかを示すコード(プロトタイプ)。また、DBにデータを保存する際のフローステージの定義も表示します。 @FredericA。 –

+0

フローステージを追加しようとしたコードを追加しました。私はこれが初めてだから間違っているかもしれない。 – practice2perfect

+0

実際にこのコードをフローに使用しましたか?それは明らかに間違っており、コンパイルエラーを説明します –

答えて

1

さんがあなたの流れを見てみましょう:

val flow = Flow[WSMessage].map(element => println(s"Message: $element")) 

それはタイプWSMessageの要素を取り、何も(Unit)を返しません。ここでは、正しいタイプで、再びです:

val flow: Flow[Unit] = Flow[WSMessage].map(element => println(s"Message: $element")) 

シンクがWSMessageなくUnitを期待するように、これは明らかに動作しません。大体、データベース内のメッセージを永続化するために

val flow = Flow[WSMessage].map { element => 
    println(s"Message: $element") 
    element 
} 

ないように、あなたが最も可能性の高い非同期のステージを使用したいと思うでしょう:ここ

は、あなたは上記の問題を解決することができる方法だ

val flow = Flow[WSMessage].mapAsync(parallelism) { element => 
    println(s"Message: $element") 
    // assuming DB.write() returns a Future[Unit] 
    DB.write(element).map(_ => element) 
} 
+0

ありがとう@Fredric A.それは私がそれがどのように動作するかを理解するのに役立ちます。 – practice2perfect

+0

もう1つの疑問があります。私が与えた例では、あるクライアントから別のクライアントにメッセージを送信するように動作します。 私は、時々サーバーから動的に生成されるメッセージをすべてのクライアントにブロードキャストしたいのですが、どうすればいいですか? – practice2perfect

+0

私はあなたが新しい 'Source'を作成することでそれを達成できると思います(そのソースはあなたが説明したようにサーバーです)。現在、あなたの既存の 'source.via(flow)'がソースであることを理解すれば、今では2つのソースをマージする必要があります。たとえば、 'source.via(flow).combine(serverEventsSource)(Merge)を実行できます。toMat(シンク)(Keep.both).run() ' –

関連する問題