2015-12-15 13 views
6

私のシナリオでは、クライアントは「さようなら」websocketメッセージを送信し、以前に確立した接続をサーバ側で閉じる必要があります。アッカ-HTTP docsからサーバからのakka-http websocket接続を閉じる

閉会の接続は、(Sink.cancelledとSource.emptyにその上流にその下流に接続することにより)、サーバー・ロジックからの着信接続の流れをキャンセルすることにより可能です。また、IncomingConnectionのソース接続をキャンセルして、サーバーのソケットをシャットダウンすることもできます。

は、しかし、それは新しい接続をネゴシエートするときSinkSourceが一度設定されていることを考慮に入れていることを行う方法を私にはっきりしていない:

(get & path("ws")) { 
    optionalHeaderValueByType[UpgradeToWebsocket]() { 
    case Some(upgrade) ⇒ 
     val connectionId = UUID() 
     complete(upgrade.handleMessagesWithSinkSource(sink, source)) 
    case None ⇒ 
     reject(ExpectedWebsocketRequestRejection) 
    } 
} 

答えて

3

ヒント:この答えはakka-stream-experimentalバージョン2.0-M2に基づいています。 APIは、他のバージョンでは若干異なる場合があります。通過するすべての要素クライアント側で、または(サーバー側であって、一般に受け入れられ

import akka.stream.stage._ 

val closeClient = new PushStage[String, String] { 
    override def onPush(elem: String, ctx: Context[String]) = elem match { 
    case "goodbye" ⇒ 
     // println("Connection closed") 
     ctx.finish() 
    case msg ⇒ 
     ctx.push(msg) 
    } 
} 

すべての要素:


接続をクローズする簡単な方法は、PushStageを使用することですa Flow)は、このようなStageコンポーネントを通過します。 Akkaでは、完全抽象はGraphStageと呼ばれ、詳細はofficial documentationにあります。

PushStageでは、具体的な入力要素をその値で見ることができ、それに応じてコンテキストを変換することができます。上記の例では、goodbyeメッセージが受信されるとコンテキストを終了します。それ以外の場合は、pushメソッドで値を転送します。

今、我々はtransform方法により任意のフローにcloseClientコンポーネントを接続できます。

val connection = Tcp().outgoingConnection(address, port) 

val flow = Flow[ByteString] 
    .via(Framing.delimiter(
     ByteString("\n"), 
     maximumFrameLength = 256, 
     allowTruncation = true)) 
    .map(_.utf8String) 
    .transform(() ⇒ closeClient) 
    .map(_ ⇒ StdIn.readLine("> ")) 
    .map(_ + "\n") 
    .map(ByteString(_)) 

connection.join(flow).run() 

を上記フローはByteStringを受信し、ByteStringを返し、それがjoin介しconnectionに接続することができることを意味します方法。フローの内部では、最初にバイトを文字列に変換してから、closeClientに送信します。 PushStageがストリームを終了しない場合、ストリーム内で要素が転送され、stdinからの入力によって置き換えられます。その後、stdinはワイヤを介して返送されます。ストリームが終了した場合は、ステージコンポーネント後のすべてのストリーム処理ステップが削除され、ストリームが閉じられます。

+0

ありがとうございました!私が探していたもののように見えます。 – Tvaroh

+0

残念ながら、これは 'in'フローに適用されたときにwebsocket接続(akka-http上)を閉じることはありません。たぶんそれはまた、「仕上げ」されたソースを必要とするでしょう。 – Tvaroh

+0

はい、アッカ-HTTP彼らは「完成」しているの両方の接続をクローズします: 'Flow' - 提案として'それをfinish'ingと(出力の) 'Source'で - その基礎となる俳優を停止しています。 – Tvaroh

2

これはアッカストリーム

package com.trackabus.misc 

import akka.stream.stage._ 
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} 

// terminates the flow based on a predicate for a message of type T 
// if forwardTerminatingMessage is set the message is passed along the flow 
// before termination 
// if terminate is true the stage is failed, if it is false the stage is completed 
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true) 
    extends GraphStage[FlowShape[T, T]] 
{ 
    val in = Inlet[T]("TerminateFlowStage.in") 
    val out = Outlet[T]("TerminateFlowStage.out") 
    override val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 

     setHandlers(in, out, new InHandler with OutHandler { 
     override def onPull(): Unit = { pull(in) } 

     override def onPush(): Unit = { 
      val chunk = grab(in) 

      if (pred(chunk)) { 
      if (forwardTerminatingMessage) 
       push(out, chunk) 
      if (terminate) 
       failStage(new RuntimeException("Flow terminated by TerminateFlowStage")) 
      else 
       completeStage() 
      } 
      else 
      push(out, chunk) 
     } 
     }) 
    } 
} 

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe]) 

あなたのステージを定義し、一部としてそれを含む使用の(2.4.14)、現在のバージョンに従うことによって達成することができますフローの

.via(termOnKillMe) 
関連する問題