2017-11-07 7 views
1

私は単純な使用例があります。これは、レスポンスストリーム型のバックプレッシャー管理を使用して、残りの呼び出しを行い、mongoをクエリしてから、任意に大きなデータストリームをクライアントに返すことです。vertex.x ReactiveReadStreamを<Document>からReactiveWriteStreamに変換するには<Buffer>

これは、Spring WebFluxとReactorを使用して簡単に達成できました。私は現在、実装の容易さの比較として、vert.xを使用して同じ目標を達成しようとしています。

vertex.xmongoクライアントがバックプレッシャーの管理をサポートしていないことが判明したので、今度はWebFlux mongoクライアントを使用して、次に示すように、vert.x HttpResponseを通してデータをポンピングしようとしています。コードは:

public class MyMongoVerticle extends AbstractVerticle { 

ReactiveMongoOperations operations; 

public void start() throws Exception { 

final Router router = Router.router(vertx); 

router.route().handler(BodyHandler.create()); 

    router.get("/myUrl").handler(ctx -> { 

     // WebFlux mongo operations returns a ReactiveStreams compatible entity 
     Flux<Document> mongoStream = operations.findAll(Document.class, "myCollection"); 

     ReactiveReadStream rrs = ReactiveReadStream.readStream(); 
     // rrs is ReactiveStream streams subscriber 
     mongoStream.subscribe(rrs); 

     // Pump pumps the rrs (ReactiveReadStream) to the HttpServerResponse (ReactiveWriteStream) 
     Pump pump = Pump.pump(rrs, ctx.response()); 
     pump.start(); 

    }); 

    vertx.createHttpServer().requestHandler(router::accept).listen(8777); 
} 
}  

私が遭遇した問題は、HttpServerResponseがそうバッファではなく、ドキュメントのストリームを期待しているReactiveWriteStream <バッファ>を実装していることです。結果はClassCaseExceptionです。

私が持っている質問は、このドキュメントストリームをReactiveWriteStreamに変換する方法です。<バッファ>?これを行うもう1つのより良い方法があるかもしれないので、私はこれを達成する方法に関する他の提案に公開しています。

答えて

0

Pumpは、現在の変換をサポートしていません。自分でポンプを実装する必要があります。幸いなことに、これはあまりにも難しいことではありません。私は、これは「ネイティブ」WebFluxの実装よりも効果的であることを期待していない

 Flux<Document> mongoStream = operations.findAll(Document.class, "myCollection"); 

     ReactiveReadStream<Document> rrs = ReactiveReadStream.readStream(); 
     mongoStream.subscribe(rrs); 

     HttpServerResponse outStream = ctx.response(); 
     // Changes start here 
     rrs.handler(d -> {     
      if (outStream.writeQueueFull()) { 
       outStream.drainHandler((s) -> { 
        rrs.resume(); 
       }); 
       rrs.pause(); 
      } 
      else { 
       outStream.write(d.toJson()); 
      } 
     }).endHandler(h -> { 
      outStream.end(); 
     }); 

注意。

また、この例のJSONは適切なJSON配列にラップしないので、マングルされます

関連する問題