2016-04-10 12 views
0

私は、Akka HTTPを使用してWebソケットサービスを構築することを試しています。私は完全に到着する厳密なメッセージを処理する必要があり、m個のフレームに到着するストリームメッセージを処理する必要があります。私はhandleWebSocketMessages()でWebソケットの処理をフローに渡すためにルートを使用しています。AkkaのStrictとStreamed WebSocketメッセージの両方を消費する

val route: Route = 
    get { 
    handleWebSocketMessages(createFlow()) 
    } 

def createFlow(): Flow[Message, Message, Any] = Flow[Message] 
    .collect { 
    case TextMessage.Strict(msg) ⇒ msg 
    case TextMessage.Streamed(stream) => ??? // <= What to do here?? 
    } 
    .via(createActorFlow()) 
    .map { 
    case msg: String ⇒ TextMessage.Strict(msg) 
    } 

def createActorFlow(): Flow[String, String, Any] = { 
    // Set Up Actors 
    // ... (this is working) 
    Flow.fromSinkAndSource(in, out) 
} 

私は2つの厳格なとストリーミングの両方のメッセージを処理する方法は本当にわからない。私が持っているコードは次のようになります。

.collect { 
    case TextMessage.Strict(msg) ⇒ Future.successful(msg) 
    case TextMessage.Streamed(stream) => stream.runFold("")(_ + _) 
    } 

しかし、今、私の流れは明らかに私がメッセージを処理する必要があり、特に以来、私はその後、どのように処理するかを確認していない文字列だけ、ではなく、未来の[文字列]を処理しなければならない:私はこのような何かを行うことができます実現します順番に。

私はこのakkaの問題を見ましたが、それは多少関連しているようですが、私が必要とするものではありません(私は思っていませんか?)。

https://github.com/akka/akka/issues/20096

すべてのヘルプは

答えて

0

に基づいて究極の答え以下(svezfazのおかげで)答えはこのようになった:

val route: Route = 
    get { 
    handleWebSocketMessages(createFlow()) 
    } 

def createFlow(): Flow[Message, Message, Any] = Flow[Message] 
    .collect { 
    case TextMessage.Strict(msg) ⇒ 
     Future.successful(MyCaseClass(msg)) 
    case TextMessage.Streamed(stream) => stream 
     .limit(100)     // Max frames we are willing to wait for 
     .completionTimeout(5 seconds) // Max time until last frame 
     .runFold("")(_ + _)   // Merges the frames 
     .flatMap(msg => Future.successful(MyCaseClass(msg))) 
    } 
    .mapAsync(parallelism = 3)(identity) 
    .via(createActorFlow()) 
    .map { 
    case msg: String ⇒ TextMessage.Strict(msg) 
    } 

def createActorFlow(): Flow[MyCaseClass, String, Any] = { 
    // Set Up Actors as source and sink (not shown) 
    Flow.fromSinkAndSource(in, out) 
} 
2

折りたたみが賢明な選択肢のように聞こえるにappriciatedされるだろう。あなたのストリームに将来を処理する使用して行うことができる(例えば)

flowOfFutures.mapAsync(parallelism = 3)(identity) 

mapAsyncがdocsごとに、受信メッセージの順序を維持していることに注意してください。

別のノートでは、ストリーミングされたWSのメッセージを処理するために、他の賢明な予防策が折りたたまれるメッセージのためにバインドされた時間と空間にcompletionTimeoutと制限を使用することができ(例えば)

stream.limit(x).completionTimeout(5 seconds).runFold(...) 
+0

ありがとうsvezfaz。あなたのヒントは私の問題を解決するのに役立ちました。私は今働いている。 –

+0

この場合の便利なメソッドの追加は、途中で私たちのレーダーにあります。https://github.com/akka/akka/issues/20096 –

+0

ありがとうございました。私はこの問題に登録しました。私は、この質問/回答が人々を助けてくれることを祈っています。 –

関連する問題