私はRxJava
を学習しており、DBからデータを読み込んでそれをQueueに投稿するシナリオをテストしています。私はちょうど全体のプロセスのサンプル模擬を作ったが、私はそれが欲しかったようにObservable
が働いているとは思わない。非同期に。Observableが非同期ではありません
これは私のコードです:
package rxJava;
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
public class TestClass {
public static void main(String[] args) {
TestClass test = new TestClass();
System.out.println("---START---");
test.getFromDB().subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Publish complete.");
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onNext(String s) {
test.publishToQueue(s).subscribe(new Observer<Boolean>() {
@Override
public void onNext(Boolean b) {
if (b) {
System.out.println("Successfully published.");
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable arg0) {
}
});
};
});
System.out.println("---END---");
}
public Observable<String> getFromDB() {
List<String> list = new ArrayList<String>();
for (int i = 0; i < 30; i++) {
list.add(Integer.toString(i));
}
return Observable.from(list).doOnNext(new Action1<String>() {
@Override
public void call(String temp) {
if (temp.contains("2")) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
public Observable<Boolean> publishToQueue(String s) {
return Observable.defer(() -> {
try {
if (s.contains("7")) {
Thread.sleep(700);
}
System.out.println("Published:: " + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Observable.just(true);
});
}
}
は、私は非同期にDBからリストを取得し、キューにそれを投稿したいとし、。私はgetFromDB
から返されたObservable
を使用しており、私はDBから取得したデータを模倣してそれを購読しています。 DBからデータを取得するたびに、publishToQueue
を使用してキューにプッシュし、Observable
を返します。キューの呼び出しを非同期にする必要がありました。今私が返す(Observable<Boolean>
)Boolean
のような待ち行列からの肯定応答で、何かを印刷したいと思います。
基本的には、両方のプロセスを非同期にしたいだけです。 DBからのすべてのデータに対して、私はそれをキューに非同期にプッシュします。
私は両方のメソッド、dbコール、キューに遅延を模倣し、非同期操作をテストするためにThread.sleep()
を追加しました。私はこれが問題の原因だと思う。しかし、私もObseravable.delay()
を試しましたが、それは出力を生成しません。
私はこれがどのように機能し、どのように私はそれが私がそれを必要とするように動作させることができます理解してください。
説明できますか? – v1shnu