2017-10-16 9 views
2

途中でflatMapを適用した後、元の順序をストリームに保持しようとしています。ここでflatMapを使用した後、元のストリームの元の順序を維持しているRx swift

は、私が何を意味するか詳しく説明するための図である。

---- 2-4-1 ------------------(オリジナルストリーム)

----------- 1--2 --------- 4--(ネットワークアクティビティ - 遅れてflatMapによって表される)

------ --------- 2 ---------ここ4-1(指名手配結果)

は、状況を詳しく説明するためのコードです:

persistMessageEventBus.flatMap({ num -> Observable<Int> in 

     print("aaab Doing \(num)") 

     let t2g = Observable.just(num).delay(Double(num), scheduler: MainScheduler.instance).do(onNext:{ num in print("aaab Done async \(num)")}) 

     return t2g 

    }).concatMap({ num -> Observable<Int> in 

     print("aaab Done map \(num)") 

     return Observable.just(num) 

    }).subscribe(onNext: { num in 

     print("aaab done \(num)") 

    }).addDisposableTo(disposeBag) 

    persistMessageEventBus.onNext(2) 
    persistMessageEventBus.onNext(4) 
    persistMessageEventBus.onNext(1) 

出力は次のとおりです。

aaab Doing 2 
aaab Doing 4 
aaab Doing 1 
aaab Done async 1 
aaab Done map 1 
aaab done 1 
aaab Done async 2 
aaab Done map 2 
aaab done 2 
aaab Done async 4 
aaab Done map 4 
aaab done 4 

希望の出力は次のようになります。

aaab Doing 2 
aaab Doing 4 
aaab Doing 1 
aaab Done async 1 
aaab Done async 2 
aaab Done map 2 
aaab done 2 
aaab Done async 4 
aaab Done map 4 
aaab done 4 
aaab Done map 1 
aaab done 1 

はRxSwiftでそのようなものはありますか?

答えて

1

代わりに.concatMap()を使用すると、元の順序が保証されます。

UPDATE#1

は、その後明らかにそれがインデックス化し、いくつかのバッファリングが必要になります。

 typealias Indexed = (num: Int, index: Int) 

     class Buffer { 
      let ordered = PublishSubject<Int>() 
      private var current = 0 
      private var buffer: [Int: Int] = [:] 
      func onNext(_ indexed: Indexed) { 
       self.buffer[indexed.index] = indexed.num 
       for index in self.buffer.keys.sorted() { 
        if index == current { 
         ordered.onNext(self.buffer[index]!) 
         self.buffer.remove(at: self.buffer.index(forKey: index)!) 
         current += 1 
        } 
       } 
      } 
     } 

     let buffer = Buffer() 

     buffer 
      .ordered 
      .subscribe(onNext: { num in 

       print("aaab done \(num)") 

      }) 
      .disposed(by: disposeBag) 

     persistMessageEventBus 
      .mapWithIndex { (num, index) -> Indexed in 
       return (num: num, index: index) 
      } 
      .flatMap({ indexed -> Observable<Indexed> in 

       print("aaab Doing \(indexed.num)") 

       let t2g = Observable.just(indexed).delay(Double(indexed.num), scheduler: MainScheduler.instance).do(onNext: { indexed in print("aaab Done async \(indexed.num)") }) 

       return t2g 

      }) 
      .subscribe(onNext: { indexed in 
       buffer.onNext(indexed) 
      }) 
      .disposed(by: disposeBag) 

     persistMessageEventBus.onNext(2) 
     persistMessageEventBus.onNext(4) 
     persistMessageEventBus.onNext(1) 
 
aaab Done async 1 
aaab done 2 
aaab Done async 2 
aaab done 4 
aaab Done async 4 
aaab done 1 
+0

うんが、その後ネットワークアクティビティは、次のように見ているので、代わりに並列になりません。--------------- 2 ----- ---- 4-1ストリームはこのように見えます--------------- 2 --------------------- ---- 4-1 – Rotem

+0

@Rotem ** UPDATE#1 ** –

+0

@ Max-Volginありがとうございました。これは私がこの問題を解決した理由ですこれはRxにネイティブ演算子があるかどうか疑問に思っていたからです。とにかく、私はこの答え、感謝とggを受け入れます! – Rotem

関連する問題