2016-04-30 3 views
5

私のアプリケーションには、Akka-Websocketインターフェイスがあります。 Webソケットは、アクタ - サブスクライバとアクタパブリッシャで構成されています。サブスクライバは、対応するアクタにコマンドを送信してコマンドを処理します。パブリッシャーはイベントストリームをリッスンし、ストリームに更新情報を公開します(最終的にクライアントに公開します)。これはうまくいく。Webソケット接続のサブスクライバからパブリッシャへのリアクティブストリームでメッセージを送信する方法

私の質問:サブスクライバがイベントをストリームに戻すにはどうすれば可能ですか?例えば、受信したコマンドの実行を確認する。

public class WebSocketApp extends HttpApp { 

    private static final Gson gson = new Gson(); 

    @Override 
    public Route createRoute() { 
    return get(
     path("metrics").route(handleWebSocketMessages(metrics())) 
     ); 
    } 

    private Flow<Message, Message, ?> metrics() { 
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props()); 
    Source<Message, ActorRef> metricsSource = 
     Source.actorPublisher(WebSocketDataPublisherActor.props()) 
     .map((measurementData) -> TextMessage.create(gson.toJson(measurementData))); 
    return Flow.fromSinkAndSource(metricsSink, metricsSource); 
    } 
} 

素敵な解決策は、サブスクライブしている俳優(上記のコードでWebSocketCommandSubscriber俳優)バックsender().tell(...)などのストリームにメッセージを送ることができることを、可能性が...

答えて

4

いいえ、それは、可能ではありません少なくとも直接的ではありません。ストリームは常に一方向です。すべてのメッセージは一方向に流れますが、要求は反対方向に流れます。シンクアクタにソースアクタを登録するなどして、確認メッセージをシンクからソースに渡す必要があります。あなたのシンクの俳優がRegisterSourceメッセージを受信した後、それは、出力ストリームに転送します提供ActorRefに確認メッセージを送信することができ、その後

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> { 
    sinkActor.tell(new RegisterSource(sourceActor), null); 
}) 

:それはこのようになります。

関連する問題