私のアプリケーションには、Akka-Websocketインターフェイスがあります。 Webソケットは、アクタ - サブスクライバとアクタパブリッシャで構成されています。サブスクライバは、対応するアクタにコマンドを送信してコマンドを処理します。パブリッシャーはイベントストリームをリッスンし、ストリームに更新情報を公開します(最終的にクライアントに公開します)。これはうまくいく。Webソケット接続のサブスクライバからパブリッシャへのリアクティブストリームでメッセージを送信する方法
私の質問:サブスクライバがイベントをストリームに戻すにはどうすれば可能ですか?例えば、受信したコマンドの実行を確認する。
public class WebSocketApp extends HttpApp {
private static final Gson gson = new Gson();
@Override
public Route createRoute() {
return get(
path("metrics").route(handleWebSocketMessages(metrics()))
);
}
private Flow<Message, Message, ?> metrics() {
Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
Source<Message, ActorRef> metricsSource =
Source.actorPublisher(WebSocketDataPublisherActor.props())
.map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
return Flow.fromSinkAndSource(metricsSink, metricsSource);
}
}
素敵な解決策は、サブスクライブしている俳優(上記のコードでWebSocketCommandSubscriber
俳優)バックsender().tell(...)
などのストリームにメッセージを送ることができることを、可能性が...