2017-04-16 10 views
5

私は、次のコードを持っている:RxJava:観察可能およびデフォルトのスレッド

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1 

重要詳細:

Observable.create(new ObservableOnSubscribe<String>() { 
      @Override 
      public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { 
       Thread thread = new Thread(new Runnable() { 
        @Override 
        public void run() { 
         s.onNext("1"); 
         s.onComplete(); 
        } 
       }); 
       thread.setName("background-thread-1"); 
       thread.start(); 
      } 
     }).map(new Function<String, String>() { 
      @Override 
      public String apply(@NonNull String s) throws Exception { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("map: thread=" + threadName); 
       return "map-" + s; 
      } 
     }).subscribe(new Observer<String>() { 
      @Override 
      public void onSubscribe(Disposable d) {} 

      @Override 
      public void onNext(String s) { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onNext: thread=" + threadName + ", value=" + s); 
      } 

      @Override 
      public void onError(Throwable e) {} 

      @Override 
      public void onComplete() { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onComplete: thread=" + threadName); 
      } 
     }); 

そして、ここでは出力です私は別のスレッド(mainスレッドからsubscribeメソッドを呼んでいますAndroidで)。

Observableクラスは、同期、デフォルトであり、それは右、イベント(s.onNext)を発する同じスレッド上のすべて(map +通知する加入者のような演算子を)実行のようにそうに見えますか?私は疑問に思います...それは意図された行動ですか、私は何か誤解しましたか?実際には、私は少なくともonNextonCompleteのコールバックが発信イベントではなく呼び出し元のスレッドで呼び出されると予想していました。この特定のケースでは、実際の発信者のスレッドは問題ではないことを正しく理解していますか?少なくともイベントが非同期的に生成されるとき。

もう1つの懸念事項 - Observableを外部ソースからパラメータとして受け取った場合(つまり、私自身は生成しません)、そのユーザーが自分であるかどうかを確認する方法はありません同期または非同期で、私はsubscribeOnobserveOnの方法でコールバックを受け取る場所を明示的に指定するだけです。

ありがとうございます!

答えて

4

RxJavaは並行性については全く知られていません。 observeOn/subscribeOnのような他の機構を使用していない場合は、サブスクライブスレッドで値を生成します。演算子のスレッドのような低レベルの構造を使用しないでください、あなたは契約を破る可能性があります。

スレッドを使用するため、onNextは呼び出し元のスレッド( 'background-thread-1')から呼び出されます。サブスクリプションは呼び出し時に発生します(UIスレッド)。チェーンの下にあるすべての演算子は 'background-thread-1'-calling-Threadから呼び出されます。サブスクリプションonNextは、 'background-thread-1'からも呼び出されます。

呼び出しスレッド以外の値を生成する場合は、次のようにします。subscribeOn。スレッドをメインのuse observeOnに戻したい場合は、チェーンのどこかで行ってください。おそらくそれを購読する前に。

例:ここでは

Observable.just(1,2,3) // creation of observable happens on Computational-Threads 
      .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite 
      .map(integer -> integer) // map happens on Computational-Threads 
      .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread 
      .subscribe(integer -> { 
       // called from mainThread 
      }); 

は良いexplanitationです。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

関連する問題