2016-08-05 2 views
3

この質問はthis oneに関するもので、Reactive Spring Controllerからデータをストリームする方法について尋ねました。Spring 5 Reactive Programming - WebClient ClassCastException SpringからのJSONをアンマーシャリングするときReactive Controllerがデータをストリームする

Rossenが指摘しているように、サーバーの送信イベントとしてストリーミングされた結果を返すには、text/event-streamを使用する必要があります。

私はこのようなサービスがあります。ブラウザからそれを呼び出す

@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream") 
public Flux<Alert> getAccountAlertsStreaming() { 
    return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"), 
             new Alert((long)2, "Alert message2"), 
             new Alert((long)3, "Alert message3")}) 
       .delayMillis(1000) 
       .log(); 
} 

を、3つの結果は、1秒の遅延で受信され始めます。問題は、その時にクライアント回です

@Test 
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class}) 
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{ 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    @Autowired 
    private AccountsServiceClient client; 

    public void testNumbersServiceClientStreamingTest() throws InterruptedException{ 

     CountDownLatch latch = new CountDownLatch(1); 

     Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080"); 
     alerts.doOnComplete(() -> { 
      latch.countDown(); 
     }).subscribe((n) -> { 
      logger.info("------------> GOT ALERT {}", n); 
     }); 

     latch.await(); 
    } 
} 

私はWebClientのからこのサービスを呼び出すために望んでいたし、それをこのように実装:

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(Alert.class)); 
     return response; 
    }  
} 

そして、これはテストコードです結果が得られたときに結果を抽出するには、HttpMessageReader'のどれもtext/event-stream + Alert.classのいずれも読み取れません。

public class ResponseExtractors { 

    protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders, 
       ResolvableType responseType, MediaType contentType) { 

      return messageReaders.stream() 
        .filter(e -> e.canRead(responseType, contentType)) 
        .findFirst() 
        .orElseThrow(() -> 
          new WebClientException(
            "Could not decode response body of type '" + contentType 
              + "' with target type '" + respons 

eType.toString() + "'")); 
    } 

例外:

reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at reactor.core.Exceptions.bubble(Exceptions.java:97) 
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) 
    at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169) 
    at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161) 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) 
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103) 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) 
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71) 
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) 
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) 
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source) 
    at java.util.Optional.orElseThrow(Optional.java:290) 
    at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200) 
    at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181) 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126) 
    ... 37 common frames omitted 
+0

WebClientとWebブラウザの違いは、サーバーの視点からですか? –

+0

私はあなたの要点を見ていますが、新しいセマンティクスを除いて、結果をストリーミングするのではなく、それらのすべてを同時に行う場合、古い 'RestTemplate'の代わりに新しい' WebClient'を使う点は何でしょうか? ? – codependent

答えて

0

多分これは、フレームワークによって自動的に処理されなければなりません。いずれにせよ、私はそれが自分自身JSONストリームデータを非整列化解く:

WebConfigClient:

@Configuration 
public class WebClientConfig { 

    @Bean 
    public ObjectMapper jacksonObjectMapper(){ 
     return new ObjectMapper(); 
    } 

    @Bean 
    public WebClient webClient(){ 
     WebClient webClient = new WebClient(new ReactorClientHttpConnector()); 
     return webClient; 
    } 

} 

サービスクライアント:

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    @Autowired 
    private ObjectMapper jacksonObjectMapper; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(String.class)) 
       .map((e -> { 
        try { 
         e = e.substring(e.indexOf(":")+1); 
         Alert a = jacksonObjectMapper.readValue(e, Alert.class); 
         return a; 
        } catch (Exception e1) { 
         e1.printStackTrace(); 
         return null; 
        } 

       })); 
     return response; 
    } 

} 

UPDATE:春5 M4のようこれはフレームワークによって行われます。新しいAPIを使用してソリューションを確認することができます。Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

0

すでに問題があります。 SPR-14539にコメント/投票してください。

関連する問題