2017-03-09 18 views
2

これは簡単な質問かもしれませんが、私はこの一見簡単な作業の解決策を見つけることができません。私がReactiveSwiftと反応型プログラミングに慣れていないので、私は単に何かを明らかにしていないかもしれません。私は、信号から特定の期間にわたり、すべての値を収集したいシグナル:時間間隔で値を収集

signal.collect(timeInterval: .seconds(5)) 

基本的に私が何をしたいのか、このようなものです。結果の信号は、第1の信号からの収集されたイベントの配列を含むx秒ごとのイベントを生成する。

ReactiveSwiftでこれを行う最良の方法は何ですか?

答えて

2

このタスクでは、ReactiveSwiftに組み込み演算子はありません。代わりに、拡張書き込み、次のアプローチを使用することができます。

import Foundation 
import ReactiveSwift 
import Result 
public extension Signal { 
    public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler()) -> Signal { 
     let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe() 
     onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) { 
      pipe.1.sendCompleted() 
     } 
     return Signal { observer in 
      return self.observe { event in 
       switch event { 
       case let .value(value): 
        observer.send(value: value) 
       case .completed: 
        observer.sendCompleted() 
       case let .failed(error): 
        observer.send(error: error) 
       case .interrupted: 
        observer.sendInterrupted() 
       } 
      } 
     }.take(until: pipe.0) 
    } 

    public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> { 
     return self.completeAfter(after: until).collect() 
    } 
} 

をそしてsignal.collectUntil(5)メソッドを使用します。

別の方法は、ReactiveSwiftのtimer機能を使用することです。例(上記のように、同じ拡張子に追加):それは内部信号を抽出SignalProducerタイプの性質を偽造ているため

public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> { 
    var signal: Signal<(), NoError>? = nil 
    timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in 
     signal = innerSignal.map { _ in() }.take(first: 1) 
    } 
    return self.take(until: signal!).collect() 
} 

私は、しかし、このアプローチは好きではありません。

Signalタイプ自体もtimeout機能ですが、エラーが発生しているので使いにくいです。それを使用する方法の例(まだ、同じ拡張子に追加):

public func completeOnError() -> Signal<Value, Error> { 
    return Signal { observer in 
     return self.observe { event in 
      switch(event) { 
      case .value(let v): observer.send(value: v) 
      case .failed(_): observer.sendCompleted() 
      case .interrupted: observer.sendInterrupted() 
      case .completed: observer.sendCompleted() 
      } 
     } 
    } 
} 

public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> { 
    return self 
     .timeout(after: until, 
       raising: NSError() as! Error, 
       on: QueueScheduler()) 
     .completeOnError() 
     .collect() 
} 

P.S. 3つのオプションのいずれかを選択することにより、正しいスケジューラーを渡すか、ソリューションを正しいスケジューラーでパラメトリックにすることを心がけてください。

+0

あなたの答えは正しいとは言いましたが、それは私が探していたものではありませんでしたが、あなたは正しい方向に私を指摘しました。私は私の問題を解決するために書いた拡張を含む自分自身の答えを追加しました。 –

0

answer by Petro Korienev(悲しいことに私が探していたものではありません)に基づいて、自分の問題を解決する拡張機能を作成しました。この拡張機能は、ReactiveSwiftの機能にできるだけ近い位置にとどまるように、ReactiveSwiftの機能に従います。

指定したtimeIntervalを超えるすべての送信値を収集し、配列として送信します。終了イベントでは、残っている値があればそれも送信します。

extension Signal { 
    func collect(timeInterval: DispatchTimeInterval, 
       on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> { 
     return Signal<[Value], Error> { observer in 
      var values: [Value] = [] 
      let sendAction:() -> Void = { 
       observer.send(value: values) 

       values.removeAll(keepingCapacity: true) 
      } 
      let disposable = CompositeDisposable() 
      let scheduleDisposable = scheduler.schedule(
        after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate), 
        interval: timeInterval, 
        action: sendAction 
      ) 

      disposable += scheduleDisposable 
      disposable += self.observe { (event: Event<Value, Error>) in 
       if event.isTerminating { 
        if !values.isEmpty { 
         sendAction() 
        } 

        scheduleDisposable?.dispose() 
       } 

       switch event { 
       case let .value(value): 
        values.append(value) 
       case .completed: 
        observer.sendCompleted() 
       case let .failed(error): 
        observer.send(error: error) 
       case .interrupted: 
        observer.sendInterrupted() 
       } 
      } 

      return disposable 
     } 
    } 
} 

extension SignalProducer { 
    func collect(timeInterval: DispatchTimeInterval, 
       on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> { 
     return lift { (signal: ProducedSignal) in 
      signal.collect(timeInterval: timeInterval, on: scheduler) 
     } 
    } 
} 

extension DispatchTimeInterval { 
    var timeInterval: TimeInterval { 
     switch self { 
     case let .seconds(s): 
      return TimeInterval(s) 
     case let .milliseconds(ms): 
      return TimeInterval(TimeInterval(ms)/1000.0) 
     case let .microseconds(us): 
      return TimeInterval(UInt64(us) * NSEC_PER_USEC)/TimeInterval(NSEC_PER_SEC) 
     case let .nanoseconds(ns): 
      return TimeInterval(ns)/TimeInterval(NSEC_PER_SEC) 
     } 
    } 
} 
+0

今私はあなたの要点を持っており、達成しようとしていることが「サンプリング」アプローチに近いと思われます。 私は短い要点を作りました、probalbyも助けてくれるかもしれません。https://gist.github.com/soxjke/f9271d452292af67cfc4bb83593588d3 –

+0

@PetroKorienev、私は両方のアプローチをあなたのGistとZeekersから試しました。しかし、コレクションやサンプルが送信される時間の周りに短い間隔で来る場合、両方ともイベントをドロップさせます。その問題を緩和するためのあらゆるアイデア? –

+0

私は自分よりも先に進んだ。問題はまったく異なっていたので、コレクション中にデータが失われたように見える –