2017-04-16 5 views
0

私はRxJavaを初めて使いました。フラットマップは、放出されたアイテムを観測可能なものにマッピングするためのものです。私はまた、documentationに基づいて、放出されたオブザーバブルがすべて単一の観測可能なストリームに結合(平坦化)されることも知っています。RxJavaフラットマップ:結果の観測対象の1つが完成するとどうなりますか?

これらの内視鏡が完成すればどうなるのでしょうか?

たとえば、私はアイテムデータキーを発行するオブザーバブルを持っています。サーバーからアイテムデータを取得するために別の非同期http呼び出しを行う必要があります。そのため、別のオブザーバブルを使用して呼び出します。フラットマップを使用してこれらの2つを接続し、1つの主要観測点を作成します。

「SomeMethodThatWantsItems」に続くrun()メソッドはいつ呼び出されますか?

public void someMethodThatWantsItems(MyHttpCaller httpCaller, SomeSearchEngine searchEngine) 
{ 
    Consumer<Item> onNextConsumer = 
    Observable<Item> searchObservable = getSearchResult(httpCaller, searchEngine, "The Search Word"); 
    searchObservable 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(new Consumer<Item>(){ 
          @Override 
          public void accept(@NonNull Item item) throws Exception { 
           //Do stuff with the item 
          } 
         } 
       , new Consumer<Exception>() { //some implementation of onErrorConsumer 
        } 
       //OnComplete 
       , new Action(){ 

         @Override 
         public void run() throws Exception { 
          //When does this get called??? after the search complete or when the first http call is successful? 
         } 
        }); 

} 

private Observable<String> getSearchResultKeys(SomeSearchEngine searchEngine, String someSearchWord) 
{ 
    return Observable.create(new ObservableOnSubscribe<String>() { 
     @Override 
     public void subscribe(@NonNull final ObservableEmitter<String> emitter) throws Exception { 

      //Assume that our search engine call onFind everytime it finds something 
      searchEngine.addSearchListener(new searchEngineResultListener(){ 
       @Override 
       public void onFind(String foundItemKey){ 
        emitter.onNext(foundItemKey); 
       } 

       @Override 
       public void onFinishedFindingResults(){ 
        emitter.onComplete(); 
       } 
      }); 

     } 
    }); 
} 

private Observable<Item> getItemByKey(MyHttpCaller httpCaller, String key) 
{ 

    return Observable.create(new ObservableOnSubscribe<Item>() { 
     @Override 
     public void subscribe(@NonNull final ObservableEmitter<Item> emitter) throws Exception { 

      //Call the server to get the item 
      httpCaller.call(key, new onCompleteListener(){ 
       @Override 
       public void onCompletedCall(Item result) 
       { 
        emitter.onNext(result); 
        //The result is complete! end the stream 
        emitter.onComplete(); 
       } 
      }); 
     } 
    }); 
} 

public Observable<Item> getSearchResult(MyHttpCaller httpCaller, SomeSearchEngine searchEngine, String someSearchWord){ 
    //Where everything comes together 
    Observable<String> searchResultObservable = getSearchResultKeys(searchEngine, someSearchWord); 
    retuern searchResultObservable 
      .observeOn(Schedulers.newThread()) 
      .flatMap(new Function<String, Observable<Item>>(){ 
       @Override 
       public Observable<Item> apply(String key){ 
        return getItemByKey(httpCaller, key); 
       } 
      }); 
} 

答えて

3

onComplete()は、常に1回呼び出され、ストリームが停止します。 (これはObservable Contractの一部です)。
つまり、あなたのケースではのSomeMethodThatWantsItemsの後に呼び出され、すべてのアイテムが取得されます。それは基本的にですのでflatMap()の場合
は、各インナーObservableの完了は、単に、flatMap()があれば、このストリームがアイテムを送信するように、内側Observableから項目をマージし、ソースObservableに内側Observableからアイテムを平坦化停止するソースObservableに信号を送ります内部Observableストリーム全体をソースストリームに消費すると、ストリーム全体は終了イベント3(onComplete()など)になるため、内部のObservableが1つ以上のアイテムを放出できる場合、ソースストリームで2回以上放出されることを意味します。

関連する問題