2017-07-01 7 views
0

RxJava2を初めて使用しています。下のコードでは、Observable/Flowableがメインスレッド上で発信していて、(subscribeOn(Schedulers。*)呼び出しを使用して)Schedulerが指定されていない場合でも、サブスクライバがバックグラウンドスレッドでどのように働いているのか理解できません。あなたは、同期サブスクリプションにRxJavaのデフォルトを購読するにスケジューラを指定しなかったので、完全なコードはthis github repo.RxJavaスレッドスケジューリングに関する問い合わせ

@OnClick(R.id.btn_start_simple_polling) 
public void onStartSimplePollingClicked() { 

    _log("onStartSimplePollingClicked called on "); //MAIN THREAD 

    final int pollCount = POLL_COUNT; 

    Disposable d = Observable 
      .interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS) 
      .map(this::_doNetworkCallAndGetStringResult) 
      .take(pollCount) 
      .doOnSubscribe(subscription -> { 
       _log(String.format("Start simple polling - %s", _counter));  //MAIN THREAD 
      }) 
      .subscribe(taskName -> { 
       _log(String.format(Locale.US, 
           "Executing polled task [%s] now time : [xx:%02d]", 
           taskName, 
           _getSecondHand())); 
      }); 

    _disposables.add(d); 
} 

private String _doNetworkCallAndGetStringResult(long attempt) { 
    try { 
     _log("_doNetworkCallAndGetStringResult called on "); //BACKGROUND THREAD 
     if (attempt == 4) { 
      // randomly make one event super long so we test that the repeat logic waits 
      // and accounts for this. 
      Thread.sleep(9000); 
     } 
     else { 
      Thread.sleep(3000); 
     } 

    } catch (InterruptedException e) { 
     Timber.d("Operation was interrupted"); 
    } 
    _counter++; 

    return String.valueOf(_counter); 
} 

答えて

4

で見つけることができます。したがって、onSubscribedoOnSubscribeへのコールはメインスレッドで発生します。

しかし、Observable.intervalオペレータは、onNextイベントをブロードキャストするために、暗黙的スケジューラまたは明示スケジューラのいずれかを必要とします。スケジューラーを指定していないので、デフォルトでSchedulers.computation()になります。 インターバルが発生すると、同じ計算スレッドで_doNetworkCallAndGetStringResultを呼び出し続け、バックグラウンドで発生します。デフォルトsyncroniously実行が、いくつかの事業者による

0

RxJava @Kiskaeはすでに間隔、遅延またはいくつかの他

としてあなたに言ったとして、あなたがasyncroniouslyパイプラインを実行したい場合は、パイプラインを実行するようになりますどのobserverOnを使用する必要があります別のスレッドは、一度

/** 
    * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread. 
    * Shall print 
    * First step main 
    * Second step RxNewThreadScheduler-2 
    * Third step RxNewThreadScheduler-1 
    */ 
    @Test 
    public void testObservableObserverOn() throws InterruptedException { 
     Subscription subscription = Observable.just(1) 
       .doOnNext(number -> System.out.println("First step " + Thread.currentThread() 
         .getName())) 
       .observeOn(Schedulers.newThread()) 
       .doOnNext(number -> System.out.println("Second step " + Thread.currentThread() 
         .getName())) 
       .observeOn(Schedulers.newThread()) 
       .doOnNext(number -> System.out.println("Third step " + Thread.currentThread() 
         .getName())) 
       .subscribe(); 
     new TestSubscriber((Observer) subscription) 
       .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
    } 

か、それはあなたが

/** 
* Does not matter at what point in your pipeline you set your subscribeOn, once that is set in the pipeline, 
* all steps will be executed in another thread. 
* Shall print 
* First step RxNewThreadScheduler-1 
* Second step RxNewThreadScheduler-1 
*/ 
@Test 
public void testObservableSubscribeOn() throws InterruptedException { 
    Subscription subscription = Observable.just(1) 
      .doOnNext(number -> System.out.println("First step " + Thread.currentThread() 
        .getName())) 
      .subscribeOn(Schedulers.newThread()) 
      .doOnNext(number -> System.out.println("Second step " + Thread.currentThread() 
        .getName())) 
      .subscribe(); 
    new TestSubscriber((Observer) subscription) 
      .awaitTerminalEvent(100, TimeUnit.MILLISECONDS); 
} 
を指定することをスレッドにあなたのパイプラインの実行を行いますsubscribeOnを使用し、あなたのパイプラインに入れています10

ここでは、async rxJavaの例をもっと見ることができますhttps://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

関連する問題