2016-11-17 3 views
0

RxJavaを使用してサーバーからシングルmp3ファイルをダウンロードするapiがあります。RxAndroid複数のファイル、最大3つの同時スレッドをダウンロードします。

Observable<ResponseBody> observable = audioService.getFile(fileNameWithExtension); 
     observable.subscribeOn(Schedulers.newThread()) 
       .observeOn(Schedulers.newThread()) 
       .subscribe(someCallBackClass<ResponseBody>); 

これは単なるファイルをダウンロードするだけで、コールバックによってファイルがディスクに保存されます。 ファイルのリストをディスクに保存し、すべてのダウンロードが完了するまで待つ必要があります。最大3つの呼び出しを同時に実行する必要があります。 どのようにRXAndroidでそれを行うには、私はフラットマップを試みたが、私はそれを完全に理解することができません。 )(

EDIT新しいコード

List<Observable<Response<ResponseBody>>> audioFiles = new ArrayList<>(); 

    for (String fileNameWithExtension : fileNamesWithExtension) { 
     Observable<Response<ResponseBody>> observable = restFactory.getAudioService().getFile(fileNameWithExtension); 
     audioFiles.add(observable); 
    } 

    Observable.from(audioFiles).flatMap(audioFile -> Observable.fromCallable(() -> { 
     audioFile.subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .toBlocking() 
       .subscribe(new CallBackWithErrorHandling<>(Downloader.this)); 
     return 0; 
    }).subscribeOn(Schedulers.io()), MAX_CONCURRENT) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Subscriber<Integer>() { 
       @Override 
       public void onCompleted() { 
        goToMainActivity(); 
       } 

       @Override 
       public void onError(Throwable e) { 
        Log.e(TAG, "Something went wrong , " + Thread.currentThread().getName()); 
        Log.e(TAG, "Something went wrong , " + e.toString()); 
        showToast(R.string.something_went_wrong); 
        goToMainActivity(); 
       } 

       @Override 
       public void onNext(Integer integer) { 
       } 
      }); 

はこの正常に動作しているが、ネットワークがダウンしている場合や、低速のインターネット接続、私は正確にobserveOnする必要のあるスレッドに理解することができません

java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare() 

を取得していますアンドロイドメインスレッド。

答えて

2

あなたは、その同時実行を制限し、flatMapでこれを達成するだけでなく、ファイル転送し、バックグラウンドスケジューラに内部観察可能な実行を必要とすることができます。

fileNames 
.flatMap(name -> { 
     return Observable.fromCallable(() -> { 
      // put your blocking download code here, save the data 
      return name; // return what you need down below 
     }) 
     .subscribeOn(Schedulers.io()); 
}, 3) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(completedFile -> { }, error -> { }, 
    () -> { /* all completed.*/ }); 

編集:あなたがしているので

をネットワークダウンロードにObservable APIを使用すると、ブロックする必要はありません:

Observable.from(audioFiles) 
.flatMap(audioFile -> 
    audioFile.subscribeOn(Schedulers.io()), // <-- apply extra transforms here 
    MAX_CONCURRENT) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(completedFile -> { }, error -> { }, 
    () -> { /* all completed.*/ }) 

不明ですあなたはCallBackWithErrorHandlingで何をしています。

関連する問題