2017-11-17 11 views
3

新しいSpringには、Spring documentationのWebSocketClientの例がいくつかあります。使用例ReactorNettyWebSocketClient

WebSocketClient client = new ReactorNettyWebSocketClient(); 
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000); 

しかし、それは明らかに非常に短く、ない:

  1. サーバーにメッセージを送信する方法(チャネルにサブスクライブ)?
  2. その後、着信ストリームを処理してFluxメッセージを送信しますか?
  3. 接続が中断されたときにサーバーに再接続しますか?

もっと複雑な例がありますか?

UPD。 は、私のような何かをしようとした:

public Flux<String> getStreaming() { 

    WebSocketClient client = new ReactorNettyWebSocketClient(); 
    EmitterProcessor<String> output = EmitterProcessor.create(); 
    Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }"); 

    Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"), 
      session -> session 
        .send(input.map(session::textMessage)) 
        .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()) 
        .then()); 

    return output.doOnSubscribe(s -> sessionMono.subscribe()); 
} 

をしかし、それは1つのメッセージのみを返します。私が購読を得なかったように。

答えて

1

「エコー」サービスを使用しているとします。サービスからいくつかのメッセージを取得するには、それらをwebsocketにプッシュしなければなりません。サービスはそれらをあなたに返送します。

サンプルコードでは、websocketに1つの要素しか書いていません。より多くのメッセージをソケットにプッシュするとすぐに、より多くのメッセージが返されます。

ローカルサービスの代わりにws://echo.websocket.orgに接続するコードを変更しました。 /streamにブラウズすると、毎秒新しいメッセージが表示されます。

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) 
public Flux<String> getStreaming() throws URISyntaxException { 

    Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date()))) 
     .delayElements(Duration.ofSeconds(1)); 

    WebSocketClient client = new ReactorNettyWebSocketClient(); 
    EmitterProcessor<String> output = EmitterProcessor.create(); 

    Mono<Void> sessionMono = client.execute(URI.create("ws://echo.websocket.org"), session -> session.send(input.map(session::textMessage)) 
     .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()).then()); 

    return output.doOnSubscribe(s -> sessionMono.subscribe()); 
} 

...この情報がお役に立てば幸いです