2013-09-29 13 views
9

このコードはスレッドセーフですか?RxJavaスレッドセーフ

Observable<String> observable = ... // some observable that calls 
            // onNext from a background thread 

observable 
    .scan(new ArrayList<String>(), (List<String> acc, String next) -> { 
    acc.add(next); 
    return acc; 
    }) 
    .subscribe(list -> { 
    // do somethind with sequence of lists 
    ... 
    }); 

ArrayListはスレッドセーフなデータ構造ではないので、私は不思議です。

+0

Rxデザインガイドラインが役立ちます。http://go.microsoft.com/fwlink/?LinkID=205219 –

+0

[RxJavaのスレッドセーフティにはSerializedSubjectが必要です](RxJavaのhttp://stackoverflow.com/questions/31841809/is-serializedsubject-necessary-for-thread-safety-in-rxjava) –

答えて

6

簡単な答えとして、.NET(元のRx実装)では、観測可能なシーケンスからのすべての値は連続しているとみなすことができます。これは、マルチスレッドになることを排除するものではありません。ただし、マルチスレッド方式で値を生成する場合は、.NET Synchronize() Rx演算子と同等の機能を探して、順次性質を適用することができます。

もう1つの方法は、RxJavaのソースコードでScanの実装をチェックし、アキュムレータ機能で安全性を提供したいと思っている逐次性を実現することを検証することです。

+0

RxJavaの 'scan' .Netオリジナル版とまったく同じです。 – allprog

+0

乾杯。私は主に、時間がたつにつれて、.NETとJavaのコードベースが同じであることを確信できないと言います。 –

4

このコードがスレッドセーフでない場合、RxJavaが破損しているか、またはObservableソースが破損しています。リエントラントでない演算子はRxコントラクトの一部です。

+0

これは、Rxの 'Checked()'オブザーバ演算子がRxJavaによって実際に実装されている場合に実行することができます。プルリクエスト、誰ですか? –

+0

ブロックのスレッドの安全性は私の意見ではあまり簡単ではありません。ポストに示されているコードでは詳細はあまり含まれていません。 @orionllが気付かなかったのは、サブスクリプションがデフォルトで同期して配信されるということです。しかし、非同期サブスクリプションに変換された場合、配列リストにアクセスすることはもはや安全ではありません。 – allprog

+1

@allprogこれはあまり単純ではないというのは間違いありませんが、これはRxの利点の1つです。 –