2017-01-19 11 views
2

Rx演算子の実行順序を把握しようとしています。Rx演算子の実行順序

私が知っているのは、最後のものが作成演算子です。観察者は加入者がそこにいる(冷たい観察可能な)まで作成されません。これは、両方のimmediatetrampolineスケジューラ(上で実行することの両方を表示さ

[email protected]- 
MAP Thread[main,5,main] 
SUBSCRIBE Thread[main,5,main] 
CREATE Thread[main,5,main] 
[email protected]-- 
CREATE Thread[RxComputationScheduler-3,5,main] 
MAP Thread[RxComputationScheduler-2,5,main] 
SUBSCRIBE Thread[RxComputationScheduler-1,5,main] 
[email protected] 
MAP Thread[pool-1-thread-2,5,main] 
CREATE Thread[pool-1-thread-3,5,main] 
SUBSCRIBE Thread[pool-1-thread-1,5,main] 
[email protected]---- 
CREATE Thread[RxIoScheduler-4,5,main] 
MAP Thread[RxIoScheduler-3,5,main] 
SUBSCRIBE Thread[RxIoScheduler-2,5,main] 
[email protected]- 
MAP Thread[RxNewThreadScheduler-2,5,main] 
SUBSCRIBE Thread[RxNewThreadScheduler-1,5,main] 
CREATE Thread[RxNewThreadScheduler-3,5,main] 
[email protected]-- 
MAP Thread[main,5,main] 
SUBSCRIBE Thread[main,5,main] 
CREATE Thread[main,5,main] 

(複数の実行についても同様)

public static void main(String[] args) throws InterruptedException { 

    test(Schedulers.immediate()); 
    test(Schedulers.computation()); 
    ExecutorService executor = Executors.newCachedThreadPool(); 
    test(Schedulers.from(executor)); 
    executor.shutdown(); 
    test(Schedulers.io()); 
    test(Schedulers.newThread()); 
    test(Schedulers.trampoline()); 

} 


static void test(Scheduler scheduler) throws InterruptedException { 
    System.out.printf("-------%s--------\n", scheduler); 

    Observable<Integer> create = Observable.create(c -> { 
     c.onNext(1); 
     c.onCompleted(); 
     print("CREATE"); 
    }); 

    create 
    .subscribeOn(scheduler) 
    .observeOn(scheduler) .map(e -> { print("MAP"); return e * 2; }) 
    .observeOn(scheduler) .subscribe(a -> { print("SUBSCRIBE");}); 

    TimeUnit.MILLISECONDS.sleep(200); 
} 

static synchronized void print(String s) { 
    System.out.printf("%s %s\n", s, Thread.currentThread()); 
} 

出力:

だから、私はこの動作をテストする場合は、このコードを書かメインスレッド)は、私が期待する正しい方法を実行します。

しかし、他のスケジューラは異なります(しかし、synchronizingの方法print、AFAIKは競合状態がstd outputになるのを防ぎます)。

なぜこのようなことが起こっていますか?

答えて

0

immediateおよびtrampolineスケジューラは現在の(単一の)スレッドを使用するため、実行順序が厳密に定義されています。

他のすべてのスケジューラはマルチスレッドです。 3つのタスクを3つの異なるスレッドにスケジュールします。

MAPはSUBSCRIBEの前に常に来なければなりません。なぜなら、SUBSCRIBEはMAPの完了後にスケジュールされるだけです(map()の結果はsubscriberに渡されます)。

それ以外に、タスクがどのような順序でシリアル化されるかは保証されません(print関数によって)。

+0

おかげ。私はちょうど、この順番で作成されたスレッド(subscribe、map、create) - スレッド番号に基づいています。しかし、それはメインスレッドの場合のようにマップを作成する必要がありますか? –

+0

@MuhammadHewedyどのスレッドを作成するかは関係ありません。とにかく特定の注文の保証はありません。それはマルチ踏みの性質です。プログラムを実行するたびに同じ結果(すべてのスケジューラで同じ順序)が表示されますか? 5〜10回確認してください。 –

1

あなたが観察することができるのは、あなたがサブスクリプションを呼び出すまで、コールドチェーンが始まっていないからです。

あなたがサブスクリプションチェーンと呼ばれるときに開始され、最初にrx.Observable.OnSubscribe#callを呼び出すと、rx.Observer#onNextという値がチェーンに送信されます。あなたがmapを他のスレッドとメインスレッドに投稿して呼び出すスケジューラを指定すると、rx.Observable.OnSubscribe#callの実行が完了するまでに時間がかかっていません。

あなたはprint("CREATE")rx.Observer#onNext上記のシーケンスを移動する場合は、必ずCREATEされます - > MAP - >この場合、MAPで

をSUBSCRIBE常に前に、SUBSCRIBE。定義されていない位置にあるスレッドがすべて実行された場合、CREATEは最後になります。スレッドの切り替えによって位置が不定です。

更新MAPスレッドの前に作成されたスレッドをSUBSCRIBEなぜコメント

によると?

各オペレータは、観測可能なオブジェクトを別のオブジェクトにラップして返します。

subscribe()を呼び出すと、最後に作成されたobservableからrx.Observable.OnSubscribe#callが呼び出されます。

その後、スタックを介して処理を行います。

rx.internal.operators.OnSubscribeMap#call 
rx.internal.operators.OperatorObserveOn#call 
rx.internal.operators.OnSubscribeMap#call 
rx.internal.operators.OperatorSubscribeOn#call 
... 

そして、あなたはOperatorObserveOn(コードcuted)

public final class OperatorObserveOn<T> implements Operator<T, T> { 

    public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
     this.scheduler = scheduler; 
     this.delayError = delayError; 
     this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
    } 

    @Override 
    public Subscriber<? super T> call(Subscriber<? super T> child) { 
     if (scheduler instanceof ImmediateScheduler) { 
      // avoid overhead, execute directly 
      return child; 
     } else if (scheduler instanceof TrampolineScheduler) { 
      // avoid overhead, execute directly 
      return child; 
     } else { 
      ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); 
      parent.init(); 
      return parent; 
     } 
    } 

    /** Observe through individual queue per observer. */ 
    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { 
     public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { 
      this.child = child; 
      this.recursiveScheduler = scheduler.createWorker(); 
      this.delayError = delayError; 
      this.on = NotificationLite.instance(); 
      int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
      // this formula calculates the 75% of the bufferSize, rounded up to the next integer 
      this.limit = calculatedSize - (calculatedSize >> 2); 
      if (UnsafeAccess.isUnsafeAvailable()) { 
       queue = new SpscArrayQueue<Object>(calculatedSize); 
      } else { 
       queue = new SpscAtomicArrayQueue<Object>(calculatedSize); 
      } 
      // signal that this is an async operator capable of receiving this many 
      request(calculatedSize); 
     } 
    } 
} 

に見ればあなたはそれをcallcreateNewWorkerを見ることができます。

したがって、それぞれobserveOnは、それらの下にある実行演算子の新しい作業者を逆順に作成します。

あなたがeasylyで見ることができるようにSchedulers.newThread()

CREATE(3) - > MAP(2) - > SUBSCRIBE(1)

+0

CREATEに関係なく。では、上記のMAPスレッドの前にSUBSCRIBEスレッドが作成されたのはなぜですか? –

+0

マップの前にサブスクライブがある場合は表示されません –

+0

2,3,4,5の場合(最初と最後を除くすべて)、SUBSCRIBEのスレッド番号はMAPのスレッド番号の前に来ます。それはそれが前に実行されていることを意味しません(実際のスレッドの実行順序は私が話しているものではありません) –