2017-04-24 16 views
1

特別なファイル名を検出している2つの頂点があるとします(たとえば、何でも構いません)。イベントバスに公開してください。 1は、ファイルシステムからREST APIと別の名前を読んで:イベントバスの終了と結果の取得

ScanRestVerticle.java

/** 
* Reads file names through an REST API 
*/ 
public class ScanRestVerticle extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL"); 

     req.toObservable() 
      .flatMap(HttpClientResponse::toObservable) 
      .lift(unmarshaller(Model.class)) 
      .subscribe(c -> vertx.eventBus().publish("address", c.specialName())); 
     req.exceptionHandler(Throwable::printStackTrace); 
     req.end(); 
    } 
} 

ScanFsVerticle.java

/** 
* Reads file names from a file 
*/ 
public class ScanFsVerticle.java extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     StringObservable.byLine(vertx.fileSystem() 
      .rxReadFile("myFileNames.txt") 
      .map(Buffer::toString) 
      .toObservable()) 
      .subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage())); 
    } 
} 

すべてが今、ここで素晴らしい作品が、私はイベントバスからこれらの名前を組み合わせてSTDに出力するverticleを持っています.OUT:

PrintVerticle.java

public class PrintVerticle extends AbstractVerticle { 

    @Override 
    public void start() throws Exception { 
     vertx.eventBus() 
      .<JsonObject>consumer("address") 
      .bodyStream() 
      .toObservable() 
      .reduce(new JsonArray(), JsonArray::add) 
      .subscribe(j -> System.out.println(j.toString())); 
} 

問題は、私が考えるように、イベントバスは無限のストリームをしているのでが、実際に完了されることはありませんここでを減らすことです。

実際にこの操作を完了し、両方の頂点によって発行された名前を印刷するにはどうすればよいですか?

注:私はvert.xとRXで本当に新たなんだと私はいくつかの作品を見つからないか、間違った何かを得るかもしれないので、裁判官:)事前に

おかげで」いけないしてください。

EDIT:中間結果を得るにはreduce()の代わりにscan()に電話することができますが、どうすればscan().last()を得ることができますか?

+0

完成の定義は何ですか? 2つの情報源によって公開された2つの名前をすべて収集したい場合は、2つのイベントを区別する必要があります – yosriz

+0

質問ありがとう!例で分かるように、私は両方の "スキャン"頂点からすべての名前を集め、それらを "印刷"の頂点に結合したいと思います...もちろんこれは単純化された例です。 –

答えて

1

あなたは正しく仮定していますが、reduce()オペレータはonComplete()イベントに依存して、収集を停止する場所を知っています。
scan()は、ソースストリームの新しいエミッションごとに累積された値を出力するアキュムレータであるため、スキャンによって中間結果が得られます。
ここでの問題は、EventBus、概念的にはEventBusが無限ストリームなので、onComplete()は決して呼び出されるべきではありません。技術的には、あなたが `EventBus、close()の時に呼ばれるかもしれませんが、私はそうすべきではなく、それに頼るべきではないと思います。

  • あなたがEventBusを使いたいならば、あなたがonComplete()イベントを持っていますので、あなたが何らかの形で、あなたのPrintVerticleストリームを終了する必要があります。たとえば、EventBusを使用して、2つの観測された頂点のそれぞれを公開するイベントを公開することができます。この2つのイベントが放出された後で、それを解凍してストリーム(takeUntil()を使用)をPrintVerticleに終了することができます。このような
    何か:

    Observable vertice1EndStream = vertx.eventBus() 
        .<JsonObject>consumer("vertice1_end") 
        .bodyStream() 
        .toObservable(); 
    
    Observable vertice2EndStream = vertx.eventBus() 
        .<JsonObject>consumer("vertice2_end") 
        .bodyStream() 
        .toObservable() 
    
    Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object()); 
    
    vertx.eventBus() 
        .<JsonObject>consumer("address") 
        .bodyStream() 
        .toObservable() 
        .takeUntil(bothVerticesEndStream) 
        .reduce(new JsonArray(), JsonArray::add) 
        .subscribe(j -> System.out.println(j.toString())); 
    
  • 別のオプションは、代わりに直接EventBusの2つのストリームを使用することです。それらを一緒にマージしてreduce()を使用すると、それらの2つのストリームは有限のストリームなので、問題はありません。

+0

提案していただきありがとうございます...私は 'EventBus'と一緒に行くつもりだと思っていましたが、Observablesだけを使った方が実際には簡単で、もっと純粋な感じです。 –

関連する問題