2017-06-05 1 views
0

以下のコードでは、Rx Nettyを使用して単純な非同期HTTPクライアントを構築しています。私が抱えている問題は、 "java.lang.IllegalStateException:コンテンツストリームが既に破棄されています。"エラー。Rx Netty:非同期クライアントが「コンテンツストリームは既に破棄されています」エラーを返す

私はここで間違っていますか?それは、これに関連している:https://github.com/ReactiveX/RxNetty/issues/264

NioEventLoopGroup provider = new NioEventLoopGroup(); 

HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, ByteBuf>("my-api.com", 80) 
    .eventloop(provider) 
    .build(); 

Gson gson = new Gson(); 

Observable.range(1, 75, Schedulers.from(provider)) 
    .flatMap(count -> { 
     Data data = new Data("test" + count); 

     return client.submit(
      HttpClientRequest.createPost("/create") 
       .withHeader("Authorization", AUTH_HEADER) 
       .withHeader("Content-Type", "application/json") 
       .withContent(gson.toJson(data)) 
    ); 
    }) 
    .flatMap(response -> { 
     return response.getContent().map((ByteBuf content) -> { 
     return gson.fromJson(content.toString(Charset.defaultCharset()), OtherData.class); 
     }); 
    }) 
    .subscribe(
     data -> logger.info("item done"), 
     err -> { 
      logger.error("Error", err); 

      provider.shutdownGracefully(); 
     }, 
     () -> { 
      logger.info("done"); 
      provider.shutdownGracefully(); 
     } 
    ); 
+0

'submit()'の結果が完了したときに 'client'は閉じられませんか? –

+0

@BobDalgleish私はそう信じていない。エラーは定期的に発生しますが、すべてではありません。このコードはエラーなしで動作することがあります。 –

答えて

0

私はあなたがresponse.getContent()を行う際には、地図ではなく、flatMap内部で行われる必要があることに気付きました。例:

NioEventLoopGroup provider = new NioEventLoopGroup(); 

HttpClient<ByteBuf, ByteBuf> client = new HttpClientBuilder<ByteBuf, 
ByteBuf>("my-api.com", 80) 
.eventloop(provider) 
.build(); 

Gson gson = new Gson(); 

Observable.range(1, 75, Schedulers.from(provider)) 
.flatMap(count -> { 
    Data data = new Data("test" + count); 

    return client.submit(
     HttpClientRequest.createPost("/create") 
      .withHeader("Authorization", AUTH_HEADER) 
      .withHeader("Content-Type", "application/json") 
      .withContent(gson.toJson(data)) 
); 
}) 
// map not flatMap 
.map(response -> { 
    return response.getContent().map((ByteBuf content) -> { 
    return gson.fromJson(content.toString(Charset.defaultCharset()), OtherData.class); 
    }); 
}) 
.subscribe(
    data -> logger.info("item done"), 
    err -> { 
     logger.error("Error", err); 

     provider.shutdownGracefully(); 
    }, 
    () -> { 
     logger.info("done"); 
     provider.shutdownGracefully(); 
    } 
); 
関連する問題