2017-06-25 10 views
1

私はメインスレッドで実行しているベースRxJava 2 Observableを持っています。私はobserveOnに別のスレッドで計算を実行するように呼びます。次にcountをサブスクライブするためにautoConnectを使用し、別の副作用操作(計算結果を書き出す)を使用しています。この例では、これはSystem.out.printlnと表示されていますが、これはしばらく時間がかかる操作であるとふりをします。カウントが終了する可能性のある競合状態を回避したいが、計算の最後の要素の結果を他のスレッドに書き出すことは完了しない。これを行うにはObservableを別のスレッドで実行しているのをどのように数えてブロックしますか?

、私は両方の観察可能な操作の完了時にメインスレッドブロックことを確実にするために、副作用の操作にblockingSubscribeと呼ばれ、そして最終的に将来に.get()と呼ばれ、.count()コールからFutureを作成しました。最後の要素を書くことを失うのを避けるためにここにはより良い解決策がありますか?私はこれを行う必要がありますか、またはObservablesが私が行方不明になっていることを保証するものはありますか?

Observableの最初の要素の計算を繰り返さないようにすることも重要です。これは、多くの高価な手順の後に発生する可能性があるためです。

サンプルコードを以下に示します。

Stream<Integer> ints = ImmutableList.of(1, 2, 3, 4, 5, 6) 
    .stream().map(i -> { 
     System.out.println("emitting:" + i); 
     return i; 
    }); 

Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator()) 
    .observeOn(Schedulers.computation()) 
    .map(x -> x + 1) 
    .publish() 
    .autoConnect(2); 

Future<Long> count = observable.count().toFuture(); 
// pretend that instead of printing here, this operation 
// could take some time to complete 
observable.blockingSubscribe(i -> System.out.println("got " + i)); 
Long c = count.get(); 

return count; 

答えて

2

の代わりに、2を作成し、あなたが単一のストリームですべての作業を行うことができますストリーム:

Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator()) 
     .observeOn(Schedulers.computation()) 
     .map(x -> x + 1); 

observable.doOnNext(integer -> { 
    //your heavy work here 
}) 
     .count() 
     .subscribe(aLong -> { 
      //do something with the final count 
     }); 

大量の計算を行うために副作用doOnNext()を使用し、count()オペレータの後にサブスクライブでは、やります最終的な数を持つ何か。 これは実行の順序を保証します。通知は同時に発生しません。これはObservable契約の一部です。doOnNext()は、同じ理由で、最終的にonNext()のサブスクライバで行われます。count()がベースですソースが完了するとObservableonComplete通知)、すべてのアイテムが放出されて処理された後に発生します。doOnNext()

+0

これを行う別の方法はありますか?基本的に副作用のある別のマップ呼び出しを追加することを考えましたが、これは本質的にこれが行っていることですが、最初に観測したいものがたくさんある場合、これはすぐに面倒になり、私の意見では最高のデザイン。 –

+0

別の地図の呼び出しはどういう意味ですか?どこ?まあ、これは単なる提案であり、ポイントは1つのストリームを持つことができ、2つのストリームを持つことができ、観測可能なものを再生して、concat()を使用して順次実行することができます。どちらの方法でも、ブロッキングストリームの使用は理想的ではないと思います。 – yosriz

+0

最高のデザインは、ここで実際に行われた作業が数に関係する重労働であることに非常に依存していますか?両方に関連して観測可能なソースですか? Observableには出力が必要ですか?いずれにせよ、単一のストリームでも、それを分割してロジックをアプリケーションのさまざまな部分に追加することができます – yosriz

関連する問題