2017-09-28 16 views
0

サービスオブジェクトを指定すると、サービスへの各関数呼び出しで副作用が発生しないようにしたいと思います。私の場合、関数Aが何をしていても、スケジューラーが利用可能でない限り、関数Bで何も実行されません。ここでconcatMap/flatMapは同じスケジューラですぐに実行する必要があります

が、これは次のようになります。現時点では

class Service { 

    func handleJobA(input: String) -> Observable<String> { 
     return Observable.just(input) 
      .do(onNext: { (str) in 
       print ("Job A: \(str)") 
      }) 
      .concatMap { input -> Observable<String> in 
       return Observable.just("Job AA: \(input)") 
        .delay(2, scheduler: self.scheduler) 
        .do(onNext: { (str) in 
         print (str) 
        }) 
      } 

      .subscribeOn(scheduler) 
    } 

    func handleJobB(input: String) -> Observable<String> { 
     return Observable.just(input) 
      .do(onNext: { (str) in 
       print ("Job B: \(str)") 
      }) 
      .delay(1, scheduler: scheduler) 
      .concatMap { input -> Observable<String> in 
       return Observable.just("Job BB: \(input)") 
        .do(onNext: { (str) in 
         print (str) 
        }) 
      } 

      .subscribeOn(scheduler) 
    } 


    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service") 
} 


let service = Service() 

_ = Observable.from(["1","2","3"]) 
    .concatMap { service.handleJobA(input: $0) } 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

_ = Observable.from(["1","2","3"]) 
    .concatMap { service.handleJobB(input: $0) } 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

import PlaygroundSupport 

PlaygroundPage.current.needsIndefiniteExecution = true 

、出力は次のようになります。

Job A: 1 
Job B: 1 
Job BB: 1 
Job BB: 1 √ 
Job B: 2 
Job AA: 1 
Job AA: 1 √ 
Job A: 2 
Job BB: 2 
Job BB: 2 √ 
Job B: 3 
Job BB: 3 
Job BB: 3 √ 
Job AA: 2 
Job AA: 2 √ 
Job A: 3 
Job AA: 3 
Job AA: 3 √ 

しかし、これは根本的な問題を示しています。内部遅延(何か、実際にはネットワークから発生する可能性があります)は、観測可能な処理が「オーダー」から抜け出す原因となります。関数は、タスクの処理を開始した後、それが行われない限り、

意味
Job A: 1 
Job AA: 1 
Job AA: 1 √ 
Job B: 1 
Job BB: 1 
Job BB: 1 √ 
Job B: 2 
Job BB: 2 
Job BB: 2 √ 
Job B: 3 
Job BB: 3 
Job BB: 3 √ 
Job A: 2 
Job AA: 2 
Job AA: 2 √ 
Job A: 3 
Job AA: 3 
Job AA: 3 √ 

、誰もがアクセスのを取得していない:

は、私が欲しいのはこれです。

私は前もって非常に良いanswerを受け取った。 flatMap/concatMap(?)は両方ともスケジューラを嫌うように見えるので、完全には適用できません。

私の理論では、実際にはconcatMapコールが適切な仕事をしていますが、次にスケジューラーキューの最後に子シーケンスの省略をスケジュールします。それが行われない限り

答えて

1

私はスケジューラの動作を説明することはできません...しかし、私は一度関数はタスクの処理を開始した小さな提案

を...作ることができ、誰もの のアクセスを取得していません。 ..

あなたはあなたが必要と動作を取得するためにconcatMapを介してすべてのあなたのhandleJob呼び出しを渡すことができます。

Observable 
    .from([1,2,3,4,5,6]) 
    .flatMap({ (value) -> Observable<String> in 
     switch value % 2 == 0 { 
     case true: 
      return service.handleJobA(input: "\(value)") 
     case false: 
      return service.handleJobB(input: "\(value)") 
     } 
    }) 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

サービスクラスの例:

private class Service { 

    private lazy var result = PublishSubject<(index: Int, result: String)>() 
    private lazy var publish = PublishSubject<(index: Int, input: String, transformation: (String) -> String)>() 
    private lazy var index: Int = 0 
    private lazy var disposeBag = DisposeBag() 

    init() { 
     publish 
      .asObservable() 
      .concatMap({ (index, input, transformation) -> Observable<(index: Int, result: String)> in 
       let dueTime = RxTimeInterval(arc4random_uniform(3) + 1) 
       return Observable 
        .just((index: index, result: transformation(input))) 
        .delay(dueTime, scheduler: self.scheduler) 
      }) 
      .bind(to: result) 
      .disposed(by: disposeBag) 
    } 

    func handleJobA(input: String) -> Observable<String> { 
     let transformation: (String) -> String = { string in 
      return "Job A: \(string)" 
     } 
     return handleJob(input: input, transformation: transformation) 
    } 

    func handleJobB(input: String) -> Observable<String> { 
     let transformation: (String) -> String = { string in 
      return "Job B: \(string)" 
     } 
     return handleJob(input: input, transformation: transformation) 
    } 

    func handleJob(input: String, transformation: @escaping (String) -> String) -> Observable<String> { 
     index += 1 
     defer { 
      publish.onNext((index, input, transformation)) 
     } 
     return result 
      .filter({ [expected = index] (index, result) -> Bool in 
       return expected == index 
      }) 
      .map({ $0.result }) 
      .take(1) 
      .shareReplayLatestWhileConnected() 
    } 

    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service") 
} 
関連する問題