2016-09-21 6 views
2

目標が返さソースがシンクが受信するどの発するこの署名シンクとシンクが

def bindedSinkAndSource[A]:(Sink[A, Any], Source[A, Any]) = ??? 

で機能を実現することである受信どんな放射するソースを作成アッカストリーム機能。

私の主な目標は、handleWebSocketMessagesディレクティブによってWebSocketのフォワーダを実装することです。
フォワーダグラフである:

leftReceiver ~> rightEmitter 
leftEmitter <~ rightReceiver 
leftEmiter leftReceiverとがであり

、左エンドポイントハンドラの流れのうち。 rightReceiverおよびrightEmitterは、右エンドポイント・ハンドラー・フローの内外にあります。例えば

import akka.NotUsed 
import akka.http.scaladsl.model.ws.Message 
import akka.http.scaladsl.server.Directive.addByNameNullaryApply 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.server.Route 
import akka.stream.scaladsl.Flow 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Source 

def buildHandlers(): Route = { 
    val (leftReceiver, rightEmitter) = bindedSinkAndSource[Message]; 
    val (rightReceiver, leftEmitter) = bindedSinkAndSource[Message]; 

    val leftHandlerFlow = Flow.fromSinkAndSource(leftReceiver, leftEmitter) 
    val rightHandlerFlow = Flow.fromSinkAndSource(rightReceiver, rightEmitter) 

    pathPrefix("leftEndpointChannel") { 
     handleWebSocketMessages(leftHandlerFlow) 
    } ~ 
     pathPrefix("rightEndpointChannel") { 
      handleWebSocketMessages(rightHandlerFlow) 
     } 
} 

handleWebSocketMessages(..)ディレクティブは、受信したフローのマテリアライズド値へのアクセスを与えていないという事実に不満を感じた私のところに来たすべてのアイデア。

+0

GraphStageを使用することができます 、http://doc.akka.io/docs/akka/2.4.10/scala/stream/stream-customize.html – gaston

+0

@gaston、私はそれを試みましたが、私はしませんでした方法を見つけ出す。私が知る限り、結果の形状はポートのタイプと数によって異なります。そして、2つのグラフステージをポートを介さずにバインドする方法が見つけられませんでした。私にヒントを送ってもらえますか? – Readren

答えて

1

私は目標を達成する方法を見つけましたが、短く、簡単な方法があるかもしれません。あなたが知っている場合は、あなたの知識を追加することを躊躇しないでください。

import org.reactivestreams.Publisher 
import org.reactivestreams.Subscriber 
import org.reactivestreams.Subscription 

import akka.NotUsed 
import akka.stream.scaladsl.Sink 
import akka.stream.scaladsl.Source 

def bindedSinkAndSource[A]: (Sink[A, NotUsed], Source[A, NotUsed]) = { 

    class Binder extends Subscriber[A] with Publisher[A] { binder => 
     var oUpStreamSubscription: Option[Subscription] = None; 
     var oDownStreamSubscriber: Option[Subscriber[_ >: A]] = None; 
     var pendingRequestFromDownStream: Option[Long] = None; 
     var pendingCancelFromDownStream: Boolean = false; 

     def onSubscribe(upStreamSubscription: Subscription): Unit = { 
      this.oUpStreamSubscription match { 
       case Some(_) => upStreamSubscription.cancel // rule 2-5 
       case None => 
        this.oUpStreamSubscription = Some(upStreamSubscription); 
        if (pendingRequestFromDownStream.isDefined) { 
         upStreamSubscription.request(pendingRequestFromDownStream.get) 
         pendingRequestFromDownStream = None 
        } 
        if (pendingCancelFromDownStream) { 
         upStreamSubscription.cancel() 
         pendingCancelFromDownStream = false 
        } 
      } 
     } 

     def onNext(a: A): Unit = { 
      oDownStreamSubscriber.get.onNext(a) 
     } 

     def onComplete(): Unit = { 
      oDownStreamSubscriber.foreach { _.onComplete() }; 
      this.oUpStreamSubscription = None 
     } 

     def onError(error: Throwable): Unit = { 
      oDownStreamSubscriber.foreach { _.onError(error) }; 
      this.oUpStreamSubscription = None 
     } 

     def subscribe(downStreamSubscriber: Subscriber[_ >: A]): Unit = { 
      assert(this.oDownStreamSubscriber.isEmpty); 
      this.oDownStreamSubscriber = Some(downStreamSubscriber); 

      downStreamSubscriber.onSubscribe(new Subscription() { 
       def request(n: Long): Unit = { 
        binder.oUpStreamSubscription match { 
         case Some(usSub) => usSub.request(n); 
         case None => 
          assert(binder.pendingRequestFromDownStream.isEmpty); 
          binder.pendingRequestFromDownStream = Some(n); 
        } 
       }; 
       def cancel(): Unit = { 
        binder.oUpStreamSubscription match { 
         case Some(usSub) => usSub.cancel(); 
         case None => 
          assert(binder.pendingCancelFromDownStream == false); 
          binder.pendingCancelFromDownStream = true; 
        } 
        binder.oDownStreamSubscriber = None 
       } 
      }) 
     } 
    } 

    val binder = new Binder; 
    val receiver = Sink.fromSubscriber(binder); 
    val emitter = Source.fromPublisher(binder); 
    (receiver, emitter); 
} 

この方法は、作成し、シンクとソースがユーザによって後で融合されていない場合BinderクラスのインスタンスVARSは、並行性の問題を被ることに留意されたいです。そうでない場合、これらの変数へのすべてのアクセスは、同期ゾーン内に囲まれている必要があります。もう1つの解決策は、シンクとソースが単一のスレッドで実行コンテキストで実現されるようにすることです。

2日後、私はMergeHubとBroadcastHubを発見しました。それらを使用すると、返されるシンクとソースを複数回マテリアライズできる利点があります。

import akka.stream.Materializer 
def bindedSinkAndSource[T](implicit sm: Materializer): (Sink[T, NotUsed], Source[T, NotUsed]) = { 
    import akka.stream.scaladsl.BroadcastHub; 
    import akka.stream.scaladsl.MergeHub; 
    import akka.stream.scaladsl.Keep; 

    MergeHub.source[T](perProducerBufferSize = 8).toMat(BroadcastHub.sink[T](bufferSize = 256))(Keep.both) run 
} 

関連する問題