2016-07-29 12 views
0

私はRxJavaを学習しており、DBからデータを読み込んでそれをQueueに投稿するシナリオをテストしています。私はちょうど全体のプロセスのサンプル模擬を作ったが、私はそれが欲しかったようにObservableが働いているとは思わない。非同期に。Observableが非同期ではありません

これは私のコードです:

package rxJava; 

import java.util.ArrayList; 
import java.util.List; 

import rx.Observable; 
import rx.Observer; 
import rx.functions.Action1; 

public class TestClass { 

    public static void main(String[] args) { 

     TestClass test = new TestClass(); 
     System.out.println("---START---"); 

     test.getFromDB().subscribe(new Observer<String>() { 

      @Override 
      public void onCompleted() { 
       System.out.println("Publish complete."); 
      } 

      @Override 
      public void onError(Throwable t) { 
       System.out.println(t.getMessage()); 
      } 

      @Override 
      public void onNext(String s) { 
       test.publishToQueue(s).subscribe(new Observer<Boolean>() { 

        @Override 
        public void onNext(Boolean b) { 
         if (b) { 
          System.out.println("Successfully published."); 
         } 
        } 

        @Override 
        public void onCompleted() { 
        } 

        @Override 
        public void onError(Throwable arg0) { 
        } 
       }); 
      }; 
     }); 
     System.out.println("---END---"); 
    } 

    public Observable<String> getFromDB() { 

     List<String> list = new ArrayList<String>(); 
     for (int i = 0; i < 30; i++) { 
      list.add(Integer.toString(i)); 
     } 
     return Observable.from(list).doOnNext(new Action1<String>() { 
      @Override 
      public void call(String temp) { 
       if (temp.contains("2")) { 
        try { 
         Thread.sleep(200); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
       } 
      } 
     }); 

    } 

    public Observable<Boolean> publishToQueue(String s) { 

     return Observable.defer(() -> { 
      try { 
       if (s.contains("7")) { 
        Thread.sleep(700); 
       } 
       System.out.println("Published:: " + s); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      return Observable.just(true); 
     }); 
    } 
} 

は、私は非同期にDBからリストを取得し、キューにそれを投稿したいとし、。私はgetFromDBから返されたObservableを使用しており、私はDBから取得したデータを模倣してそれを購読しています。 DBからデータを取得するたびに、publishToQueueを使用してキューにプッシュし、Observableを返します。キューの呼び出しを非同期にする必要がありました。今私が返す(Observable<Boolean>Booleanのような待ち行列からの肯定応答で、何かを印刷したいと思います。

基本的には、両方のプロセスを非同期にしたいだけです。 DBからのすべてのデータに対して、私はそれをキューに非同期にプッシュします。

私は両方のメソッド、dbコール、キューに遅延を模倣し、非同期操作をテストするためにThread.sleep()を追加しました。私はこれが問題の原因だと思う。しかし、私もObseravable.delay()を試しましたが、それは出力を生成しません。

私はこれがどのように機能し、どのように私はそれが私がそれを必要とするように動作させることができます理解してください。

答えて

1

、RxJavaは同期です。これは、デフォルトでは、すべてが同じスレッド(および現在のスレッド)で実行されることを意味します。あなたはobserveOn/subscribeOnメソッドに別のスレッドのおかげでタスクを実行する、または別のジョブでタスクを実行するいくつかの演算子(それはdelayintervalのように、別のスケジューラを使用しているために、...)あなたの例では

、あなたが持って使用することができますスケジューラがサブスクリプションを実行するかどうかを明確に設定します。

test.getFromDb() 
    .subscribeOn(Schedulers.io()) 
    .subscribe(); 

は、その後、あなたはflatMap演算子を使用して、あなたのpublishToQueueメソッドを呼び出すことができます(ここでは、どのスレッドObservable.fromであなたのリストを出力します)。このメソッドは以前のスケジューラーで実行されますが、observeOnメソッドのおかげで別のスケジューラーを強制的に使用することができます。 observeOnメソッドの後のすべてが別のスレッドで実行されます。

test.fromDb() 
    .subscribeOn(Schedulers.io()) 
    .observeOn(Schedulers.computation()) 
    .flatMap(l -> test.publishToqueue(l)) 
    .subscribe(); 
関連する問題