私はメインスレッドで実行しているベースRxJava 2 Observable
を持っています。私はobserveOn
に別のスレッドで計算を実行するように呼びます。次にcount
をサブスクライブするためにautoConnect
を使用し、別の副作用操作(計算結果を書き出す)を使用しています。この例では、これはSystem.out.println
と表示されていますが、これはしばらく時間がかかる操作であるとふりをします。カウントが終了する可能性のある競合状態を回避したいが、計算の最後の要素の結果を他のスレッドに書き出すことは完了しない。これを行うにはObservableを別のスレッドで実行しているのをどのように数えてブロックしますか?
、私は両方の観察可能な操作の完了時にメインスレッドブロックことを確実にするために、副作用の操作にblockingSubscribe
と呼ばれ、そして最終的に将来に.get()
と呼ばれ、.count()
コールからFuture
を作成しました。最後の要素を書くことを失うのを避けるためにここにはより良い解決策がありますか?私はこれを行う必要がありますか、またはObservablesが私が行方不明になっていることを保証するものはありますか?
Observableの最初の要素の計算を繰り返さないようにすることも重要です。これは、多くの高価な手順の後に発生する可能性があるためです。
サンプルコードを以下に示します。
Stream<Integer> ints = ImmutableList.of(1, 2, 3, 4, 5, 6)
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Observable<Integer> observable = Observable.fromIterable(() -> ints.iterator())
.observeOn(Schedulers.computation())
.map(x -> x + 1)
.publish()
.autoConnect(2);
Future<Long> count = observable.count().toFuture();
// pretend that instead of printing here, this operation
// could take some time to complete
observable.blockingSubscribe(i -> System.out.println("got " + i));
Long c = count.get();
return count;
これを行う別の方法はありますか?基本的に副作用のある別のマップ呼び出しを追加することを考えましたが、これは本質的にこれが行っていることですが、最初に観測したいものがたくさんある場合、これはすぐに面倒になり、私の意見では最高のデザイン。 –
別の地図の呼び出しはどういう意味ですか?どこ?まあ、これは単なる提案であり、ポイントは1つのストリームを持つことができ、2つのストリームを持つことができ、観測可能なものを再生して、concat()を使用して順次実行することができます。どちらの方法でも、ブロッキングストリームの使用は理想的ではないと思います。 – yosriz
最高のデザインは、ここで実際に行われた作業が数に関係する重労働であることに非常に依存していますか?両方に関連して観測可能なソースですか? Observableには出力が必要ですか?いずれにせよ、単一のストリームでも、それを分割してロジックをアプリケーションのさまざまな部分に追加することができます – yosriz