2016-10-25 14 views
0

私はRxJavaを実行し、データを生成するためにonNext()メソッドを使用するサブジェクトを作成しています。私はを使用しています。異なるスレッドでPublishSubjectを実行するrxJava

これは私の設定です:新しいデータがRxJavaストリーム上で生成され

@Component 
public class SubjectObserver { 
    private SerializedSubject<SomeObj, SomeObj> safeSource; 
    public SubjectObserver() { 
     safeSource = PublishSubject.<SomeObj>create().toSerialized(); 
     **safeSource.subscribeOn(<my taskthreadExecutor>);** 
     **safeSource.observeOn(<my taskthreadExecutor>);** 
     safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() { 
      @Override 
      public void onNext(AsyncRemoteRequest asyncRemoteRequest) { 
      LOGGER.debug("{} invoked.", Thread.currentThread().getName()); 
      doSomething(); 
      } 
     } 
    } 
    public void publish(SomeObj myObj) { 
     safeSource.onNext(myObj); 
    } 
} 

方法は@Autowire private SubjectObserver subjectObserver であり、その後、どんなに私がsubscribeOn() & observeOn()に指定したものをsubjectObserver.publish(newDataObjGenerated)

を呼び出しません:

  • Schedulers.io()
  • Schedulers.computation()
  • 私のスレッド
  • Schedulers.newThread

onNext()とその中に実際の作業は、実際には/農産物を生成するために、被写体にonNext()を呼び出すのと同じスレッド上で実行されますデータ。

これは間違いありませんか?もしそうなら、私は何が欠けていますか?私はdoSomething()が別のスレッドで実行されることを期待していました。私の呼び出し元のクラスで

更新

、私は新しいスレッドが上で実行するために、加入者に割り当てられている、その後の進路、publishメソッドを呼び出しています方法を変更した場合。

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj)); 

おかげで、

+0

発生したことは、設計によるものです。 threadXからonNextを呼び出すと、チェーン内の次の演算子が同じスレッドによって呼び出されます。 onNextが呼び出される場所から操作することはできないため、subscribeOnは役に立ちません。toSerializedを使用することによってのみ契約を維持できます。 onNext-operatorを使用して「イベント」インスタンスを作成し、observeOnを使用し、スレッドを変更した後にflatMapを使用してdoSomething()を呼び出すことをお勧めします。 subscribe-callbackでは、UIを設定するか、結果を使って何をしたいかを設定します。 –

答えて

3

余分な行動で新しいインスタンスを返すObservable/Subject上の各オペレータは、しかし、あなたのコードだけでsubscribeOnobserveOnを適用し、その後、彼らは生産と生Subjectに加入しているものは何でも捨て。

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized(); 

safeSource 
.subscribeOn(<my taskthreadExecutor>) 
.observeOn(<my taskthreadExecutor>) 
.subscribe(new Subscriber<AsyncRemoteRequest>() { 
    @Override 
    public void onNext(AsyncRemoteRequest asyncRemoteRequest) { 
     LOGGER.debug("{} invoked.", Thread.currentThread().getName()); 
     doSomething(); 
    } 
}); 

注そのsubscribe()方法で起こって何のサブスクリプションの副作用がないためsubscribeOnPublishSubjectには実用的な効果を持っていないこと:あなたは、チェーン、メソッドの呼び出しとサブスクライブする必要があります。

+0

素晴らしい!ありがとう、ちょうどこの重要な側面を見落とした。ありがとう! – user3169330

関連する問題