2017-02-20 21 views
0

私はrxSwiftを勉強しています。永続的な接続を模倣しているこのサービスに、ロングポーリングサーバーとの相互作用のためのサービスを行いたいと思います。私はそれを書いたが、それは私には思われるが、その決定がより良く行われた可能性はないか? Observableを何らかの形で繰り返すことは可能ですか?エラーに関係なく、longpollサーバーの応答にもよります。rxSwiftでロングポーリング

誰でもソリューションを共有できますか?または助言を助ける?どのように整理するのが良いですか?私は、よりよい解決策を見たいだけrxswift

class LongPollingService { 

    public var messageReciver: PublishSubject<EventProtocol> = PublishSubject<EventProtocol>() 

    private let transport = DefaultTransport() 

    private let disposeBag = DisposeBag() 

    private var currentRequestInfo = Variable<LongpollingServerInfo?>(nil) 

    private var currentRequestDisposable: Disposable? 

    private var currentLongpollingConnection: Disposable? // Subsribee for request server info 

    private var eventListener : Disposable? 

    private var currentReqursiveConnection: Disposable? // Subscriber for event listener from longpoll server 

    func startObservableEvents() { 
     getServerConnection() 
     subscribeServerInfo() 
     //testing listen events 
     eventListener = messageReciver.showMessagesInDebugMode().subscribe() 
     eventListener?.addDisposableTo(disposeBag) 
    } 

    func disconnect() { 
     currentRequestDisposable?.dispose() 
     currentLongpollingConnection?.dispose() 
     currentReqursiveConnection?.dispose() 
    } 

    private func subscribeServerInfo() { 
     currentLongpollingConnection = currentRequestInfo 
      .asObservable() 
      .filter({$0 != nil}) 
      .subscribe(onNext: { [weak self] (info) in 
       guard let sSelf = self else { return } 
       sSelf.subscribeToEvents(timeStamp: info!.ts) 
      }) 
     currentLongpollingConnection?.addDisposableTo(disposeBag) 
    } 

    private func subscribeToEvents(timeStamp: TimeInterval) { 
     if let serverInfo = currentRequestInfo.value { 
      currentReqursiveConnection?.dispose() 
      currentReqursiveConnection = getEventsFromLongpollServer(serverInfo: serverInfo, with: timeStamp) 
       .flatMap(parseUpdates) 
       .flatMap(reciveEvents) 
       .showErrorsSwiftMessagesInDebugMode() 
       .subscribe(onNext: { [weak self] updates in 
        guard let sSelf = self else { return } 
        sSelf.subscribeToEvents(timeStamp: updates) 
       }, 
       onError: { [weak self] error in 
        guard let sSelf = self else { return } 
         if let error = error as? LongPollError { 
          switch error { 
          case .olderHistory(let ts): sSelf.subscribeToEvents(timeStamp: ts) 
          default: sSelf.getServerConnection() 
          } 
         } 
       }) 
      currentReqursiveConnection?.addDisposableTo(disposeBag) 
     } 
    } 

    private func getServerConnection() { 
     //get longpolling server info for connection. 
     currentRequestDisposable = getLongpollServerInfo() 
      .subscribe(onNext: {[weak self] info in 
       guard let sSelf = self else { return } 
       sSelf.currentRequestInfo.value = info 
      }) 
     currentRequestDisposable?.addDisposableTo(disposeBag) 
    } 

    private func parseUpdates(json: Any) throws -> Observable<LongPollingUpdates> { 
     let response = try Mapper<LongPollingUpdates>().map(JSONObject: json) 
     return .just(response) 
    } 

    private func reciveEvents(updates:LongPollingUpdates) throws -> Observable<TimeInterval> { 
     if let errors = updates.failed { 
      throw parseErrors(errors: errors) 
     } 
     if let events = updates.updates { 
      parseUpdates(updates: events) 
     } 
     return Observable.just(updates.timeStamp!) 
    } 

    private func parseUpdates(updates: [[Any]]) { 
     updates.forEach { (array) in 
      let firstElementInUpdate = array.first 
      if let update = firstElementInUpdate as? Int { 
       switch update { 
       case 1: break 
       case 2: break 
       case 3: break 
       case 4: messageReciver.onNext(NewMessage(array: array)) 
       default: break 
       } 
      } 
     } 
    } 

    private func parseErrors(errors: [String: Any]) -> LongPollError { 
     if let error = errors["failed"] as? Int { 
      switch error { 
      case 1: 
       guard let ts = errors["ts"] as? TimeInterval else { return .unkownError } 
       return .olderHistory(ts: ts) 
      case 2: return .needNewkey 
      case 3: return .needCaseAndTs 
      case 4: return .unkownVersion 
      default: 
       return .unkownError 
      } 
     } 
     return .unkownError 
    } 

    private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo, with ts: TimeInterval) -> Observable<Any> { 
     let url = buildLongPollingServerRoute(from: serverInfo, with: ts) 
     let request = buldLongPollRequst(route: url) 
     let requestConvert = try? URLEncoding.default.encode(request!, with: nil) 
     return transport.makeRequest(request: requestConvert!) 
    } 

    private func getEventsFromLongpollServer(serverInfo: LongpollingServerInfo) -> Observable<Any> { 
     let url = buildLongPollingServerRoute(from: serverInfo) 
     let request = buldLongPollRequst(route: url) 
     let requestConvert = try? URLEncoding.default.encode(request!, with: nil) 
     return transport.makeRequest(request: requestConvert!) 
    } 

    private func getLongpollServerInfo() -> Observable<LongpollingServerInfo> { 
     let request = MessageRouter.getLongpollServer(useSsl: false, needPts: false) 
     return transport.makeModel(request: request) 
    } 

} 

答えて

1

を勉強し始めてからだから、あなたのような機能を持っていると仮定します:

func getData() -> Observable<Data> 

そして、あなたはそれはあなたができる、特定のperiodで長いポーリングします

Observable<Int>.interval(period, scheduler: MainScheduler.instance) 
    .map { _ in return } 
    .flatMap(getData) 
    .subscribe(/* ... handle data ... */) 
    .disposed(by: disposeBag) 

ことがより適切である場合は、MainScheduler以外のスケジューラを使用することができます。このような何かを行います。今

あなたもgetDataが発する可能性があり、あなたはそれが必ずしも長いポーリングを退会したくないというError Sを処理したい場合、あなたはこれを行うことができます。

func handleError(error: Error) -> Observable<Data> { 
    return Observable.empty() 
} 

Observable<Int>.interval(period, scheduler: MainScheduler.instance) 
    .map { _ in return } 
    .flatMap { return getData.catchError(handleError) } 
    .subscribe(/* ... handle data ... */) 
    .disposed(by: disposeBag) 

また、エラーを分析することができますhandleErrorを入力して、空のObservableを発行して続行するか、別のエラーを表示して長いポーリングを取り消すかを決定します。

+0

ありがとうございますが、私の目的には適していませんが、イベントは特定の周期性ではなく、サーバーに付随する最後のイベントの時刻から取得する必要があります。 subscribeToEvents(timeStamp: - タイムスタンプこれはサーバ上で発生した最後のイベントとしてのパラメータです。 – Zept

関連する問題