私はRxJavaを実行し、データを生成するためにonNext()
メソッドを使用するサブジェクトを作成しています。私は春を使用しています。異なるスレッドでPublishSubjectを実行するrxJava
これは私の設定です:新しいデータがRxJavaストリーム上で生成され
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
方法は@Autowire private SubjectObserver subjectObserver
であり、その後、どんなに私がsubscribeOn()
& observeOn()
に指定したものをsubjectObserver.publish(newDataObjGenerated)
を呼び出しません:
- Schedulers.io()
- Schedulers.computation()
- 私のスレッド
- Schedulers.newThread
onNext()
とその中に実際の作業は、実際には/農産物を生成するために、被写体にonNext()
を呼び出すのと同じスレッド上で実行されますデータ。
これは間違いありませんか?もしそうなら、私は何が欠けていますか?私はdoSomething()
が別のスレッドで実行されることを期待していました。私の呼び出し元のクラスで
更新は
、私は新しいスレッドが上で実行するために、加入者に割り当てられている、その後の進路、publish
メソッドを呼び出しています方法を変更した場合。
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
おかげで、
発生したことは、設計によるものです。 threadXからonNextを呼び出すと、チェーン内の次の演算子が同じスレッドによって呼び出されます。 onNextが呼び出される場所から操作することはできないため、subscribeOnは役に立ちません。toSerializedを使用することによってのみ契約を維持できます。 onNext-operatorを使用して「イベント」インスタンスを作成し、observeOnを使用し、スレッドを変更した後にflatMapを使用してdoSomething()を呼び出すことをお勧めします。 subscribe-callbackでは、UIを設定するか、結果を使って何をしたいかを設定します。 –