2017-03-14 55 views
0

RxJavaでは、データリスト上で非同期メソッドを順次実行する方法はありますか?以下の方法で順次オブジェクトの配列を超える関数を呼び出すことが可能であるNode.jsのAsyncモジュールを使用してRxJavaで非同期メソッドを同期する方法は? RxJavaの非同期ウォーターフォール

は:

var dataArray = ['file1','file2','file3']; // array of arguments 
var processor = function(filePath, callback) { // function to call on dataArray's arguments 
    fs.access(filePath, function(err) { // perform an async operation 
     // process next item in dataArray only after the following line is called 
     callback(null, !err) 
    } 
}; 
async.every(dataArray, processor, function(err, result) { 
    // process results 
}); 

何これについての素晴らしいのは、processor内で実行されるコードです非同期タスクが完了すると、callbackを実行することができます。つまり、dataArray内の各オブジェクトは、並列ではなく次々に処理されます。

さて、RxJavaに私が呼び出すことによってdataArray以上の処理機能を呼び出すことができます。

String[] dataArray = new String[] {"file1", "file2", "file3"}; 
Observable.fromArray(dataArray).subscribe(new Consumer<String>() { // in RxJava 1 it's Action1 
    @Override 
    public void accept(@NonNull String filePath) throws Exception { 
     //perform operation on filePath 
    } 
}); 

をしかし、私はfilePath上の非同期操作を実行する場合、私はdataArrayの項目について順次実行を確保することができますか?線に沿って何か私は何を探していると思います:

String[] dataArray = new String[] {"file1", "file2", "file3"}; 
Observable.fromArray(dataArray).subscribe(new Consumer<String>() { 
    // process next item in dataArray only after the callback is called 
    @Override 
    public void accept(@NonNull String filePath, CallNext callback) throws Exception { 
     // SomeDatabase will call callback once 
     // the asynchronous someAsyncOperation finishes 
     SomeDatabase.someAsyncOperation(filePath, callback); 
    } 
}); 

さらに、どのように私はいくつかは、一度dataArrayのすべての項目が処理されたコードを呼び出すのですか?ソート済みのリスナー?

注:おそらく私はRXの概念が間違っていると思います。 RxJavaを使ってNode.jsの非同期パターンを実装する方法についてのガイドラインが見つからないので、私は尋ねています。また、私はAndroidを使用しており、ラムダ機能は使用していません。

+0

concatMap()を見ましたか?このようなことを試してください Observable.from(fileNameList).concatMap((fileName) - > processFile(fileName).susbscribeOn(Schedulers.io()))。subscribe() – Buckstabue

答えて

0

非同期演算(計算など)を行うための非常に単純で簡単な方法の1つは、RxJava Async Utilsライブラリ(私の書かれていない、私たちのコードで使用されている)を使用することです。バックグラウンドスレッドでメソッドを実行するための

  • 最も簡単な方法は、メソッドの呼び出しが完了した後に渡されたメソッドの戻り値を生成Observable<T>を返すAsync.start(this::methodToCall, Schedulers.io())を使用することです。
  • 他にも多くの方法があります。あなたはObservable.from()concatMap()(または同様の方法)を使用する場合はObserver<T>

を介して中間結果を報告最初.observerOn(Schedulers.io())またはあなたが望む任意の他のスケジューラとバックグラウンドスケジューラに処理を移動することを忘れないでください。

+0

concatMap()パスありがとう!副作用として:* RxJavaAsyncUtils *これまでに見たことがありますが、ドキュメントを見つけることができませんでしたので、今は使用しないことにしました。また、this :: methodToCallは動作しません。Java 7です。使用する必要があります。いずれにせよ提案してくれてありがとう! – Voy

+0

Func0 の匿名クラスの実装を使用する従来の方法を使用することができますが、ラムダ構文を使用する必要はありません。あるいは、さらに良いことに、コンパイル時にJava 8ラムダ構文をJava 7クラスに変換する[Retrolambda](https://github.com/orfjackal/retrolambda)を使うことができます。私はそれを自分のAndroidアプリ用に使います。 Async Utilsにはインターフェースを説明するJavadocもあります。プロジェクトページにはリンクされていないので、修正する必要があります。 – etan

0

オブジェクトの配列に対して順次関数を呼び出すには、非同期のファシリティを必要としません。配列のforループを作成するだけです。 ループを非同期で実行する必要がある場合は、必要な非同期を記述してください:配列が非同期で満たされた後にループを開始するか、計算の結果に非同期で反応させたい、あるいはその両方。どちらの場合でも、クラスCompletableFutureが役に立ちます。

+0

まあ、配列上の関数を連続して呼び出すのは簡単なfor-loopを使うことができますが、ループ内で実行されるコードは非同期でループの後で配列から更新されたオブジェクトを処理する必要があります。私はさまざまな場所でこの機能を必要としていましたが、私が今作業しているのは、Postオブジェクトのリストを与えて、Glideを使ってサーバーから適切なイメージを取得する機能です。同時リクエスト。私はここで頑丈さのためにスピードを失っても構いません。 – Voy

1

試行錯誤の数日後、この質問に使用した解決策は次のとおりです。改善の提案は大歓迎です!何が起こっているか要するに

private Observable<File> makeObservable(final String filePath){ 
    return Observable.create(new ObservableOnSubscribe<File>() { 
     @Override 
     public void subscribe(final ObservableEmitter<File> e) throws Exception { 
      // someAsyncOperation will call the Callback.onResult after 
      // having finished the asynchronous operation. 
      // Callback<File> is an abstract code I put together for this example 
      SomeDatabase.someAsyncOperation(filePath, new Callback<File>(){ 
       @Override 
       public void onResult(Error error, File file){ 
        if (error != null){ 
         e.onError(error); 
        } else { 
         e.onNext(file); 
         e.onComplete(); 
        } 
       } 
      }) 
     } 
    }); 
} 

private void processFilePaths(ArrayList<String> dataArray){ 
    ArrayList<Observable<File>> observables = new ArrayList<>(); 
    for (String filePath : dataArray){ 
     observables.add(makeObservable(filePath)); 
    } 
    Observable.concat(observables) 
      .toList() 
      .subscribe(new Consumer<List<File>>() { 
       @Override 
       public void accept(@NonNull List<File> files) throws Exception { 
        // process results. 
        // Files are now sequentially processed using the asynchronous code. 
       } 
      }); 
} 

  1. 電源を入れdataArrayの配列に観測可能なの
  2. 各観察可能なサブスクリプション時の非同期操作を実行し、非同期操作の完了後onNextにデータをフィード
  3. Observableの配列にObservable.concat()を使用すると、順次実行が保証されます。
  4. U

    1. 私はわからない:このコードは、私が必要とするもの、私はいくつかの理由のために、このソリューションに完全に満足していないんが

    1つのリストに観察可能なのの結果を結合するSE .toList()Observable.create()...subscribe(){内の非同期コードを実行するとObservableを使用する正しい方法です。

  5. 私は私が達成しようとしている行動がdataArray内の各アイテムの新しい観測を作成する必要がわからないんだけど
  6. ref docs)Observable.createを使用するには、注意して使用する必要がありますことをお読みください。逐次的にデータを公開することができる観測可能性を持つことは、私にとっては自然なことですが、それは私の古い思考スタイルかもしれません。
  7. 私はconcatMapを使ってこのトリックを行うべきだと読んだことがありますが、この例にどのように適用するか決して決めることはできませんでした。concatはただのトリックです。誰か2つの違いを説明する気に?

私も前に観察可能なのの配列に.zip機能を使用してみました、と私は最後に結果の一覧を取得するために管理しながら、非同期操作がない順次並列に実行されました。

関連する問題