2017-10-03 27 views
0

私はアイテムを放出し、それらをサーバにアップロードするオブザーバブルを持っています。ここでRxJava 2 flatMapCompletableで観測できません

はコードです:

repository 
      .getItems() 
      .doOnComplete(() -> Log.d(TAG, "No items left.")) 
      .flatMapCompletable(item -> 
        repository 
          .uploadItems(item) 
          .onErrorComplete() 
          .andThen(
            deleteTemporaryItem() 
              .onErrorComplete() 
          ) 
      ); 

getItemsメソッドは、アイテムを一つずつを発し、その後、完了、uploadItems方法は、サーバーにアップロードします。問題は、すべてのチェーンonCompleteイベントが正常に機能し、すべての購読者がこのイベントを取得して処理を進めるが、いくつかのアイテムがあり、すべてがonCompleteイベントがアップロードされたときに.doOnComplete() - > Log.d(TAG、 "No items left。"))メソッドであり、すべてのサブスクライバはこのイベントを取得しません。私はonErrorCompleteを追加して、uploadItemsの後のすべてのメソッドが完了し、すべてのメソッドが完了していることをログで確認しましたが、repository.getItems()のonCompleteイベントはすべてのサブスクライバに渡されません。

誰でもこの現象が発生する原因を突き止めてください。

ありがとうございます!

+0

申し訳ありませんが、私は問題が何かを得ることはありません。使用したメソッドの問題とメソッドシグネチャに関する詳細情報を提供してください。 –

+0

@HansWurstこんにちは!私はちょうど質問を編集しました!ご注意いただきありがとうございます! –

+0

私はいくつかの誤解があると思います。 observableが終了したら、DoOnCompleteが呼び出されます。 observableは、getItems()から生成されたすべての値が処理された場合にのみ終了します。すべてのアイテムがCompleteable.completeで終了すると、オブザーバブルは完了します。フラットマップされた1つのアイテム(アップロードするラージファイル)が処理されていないように見えます。 –

答えて

0

この例を見てください:

は、私は、各工程を経アイテムを渡すので、購読は処理された各項目に通知されます。処理パイプラインには、ファイルのアップロードと削除が含まれます。

実装を変更し、出力のログを投稿してください。

@Test 
void name() throws Exception { 
    Flowable<Integer> completed_work = Flowable.just(1, 2, 3) 
      .map(integer -> integer * 1000) 
      .flatMapSingle(integer -> 
        Completable.fromAction(() -> { 
         Thread.sleep(integer); 
         // do upload stuff here 
        }) 
          .doOnComplete(() -> System.out.println("Uploaded file ....")) 
          //.timeout(10, TimeUnit.SECONDS) 
          .retry(3) 
          .andThen(
            Completable.fromAction(() -> { 
             // do delete stuff... 
            }) 
              .retry(2) 
              //.timeout(10, TimeUnit.SECONDS) 
              .doOnComplete(() -> System.out.println("Deleted file ...")) 
          ) 
          .toSingle(() -> integer) 
      ) 
      .doOnComplete(() -> System.out.println("Completed work")); 


    completed_work.test() 
      .await() 
      .assertResult(1000, 2000, 3000); 
} 
関連する問題