2017-10-18 9 views
0

RxJavaで次のように表現する方法は苦労しています。RxJavaを使った同期シーケンス

PublishSubject<List<Packet>> packetsSubject = ...; 

そして、私は、次の送信機能を持っている::さんは私がネットワーク上で送信するパケットリストのストリームを持っている想像してみましょう

public Observable<Status> transmit(Packet p) {...} 

私のように、リストの各パケットを送信したいですパケットの返された状態がStatus.OKである限り長い。言い換えれば、n番目のパケットの送信がNOKである場合、n番目の+ 1パケットは送信されるべきではない。

また、エラーが検出された場合:

  • エラーがリスト内のパケットのインデックスで表示されるべき
  • パケットの次のリストの転送を開始する必要が

ありがとうございます

+0

'concat'演算子を使用しようとしましたか?エラー処理のために新しいリストは古いものと関係がありますか? – masp

+0

2つのリストには何も共通点がありません。リクエストペイロードをスライスしてパケットのリストを取得しています。連結演算子はどのように画像に収まるでしょうか? – n1r3

+0

私は 'concat'ではリクエストがリスト内の同じ順番で発生すると思います。いずれかが失敗した場合は、次のアイテムを試しません。 – masp

答えて

1

flatMap()演算子を使用して、リストを観測可能なエントリに変換することができます。 zipWith()演算子を使用して、各リストエントリを個々のリスト内のパケットのインデックスとペアにします。 flatMap()を再度送信し、結果のステータスを取得します。 NOKをスロー可能にして、内部のオブザーバーチェーンを終了させます。 flatMap()演算子を使用する場合、最終パラメータ1を追加するとマッピングが1回発生するため、このオブザーバ・チェーンに要求が重複することはありません。

packetsSubject 
     .flatMap(listOfPackets -> Observable.from(listOfPackets) 
      .zipWith(Observable.range(1, LARGE_NUMBER), (p, i) -> new Pair<>(p, i)) 
      .flatMap(pair -> transmits(pair.getFirst()) 
        .flatMap(status -> status.equals("OK") 
          ? Observable.empty() 
          : Observable.error(new IllegalStateException())) 
        .doOnError(error -> System.out.println("Error at index " + pair.getSecond())) 
        .onErrorResumeNext(Observable.empty()), 1)) 
    .subscribe(); 

私はそれをログ、エラーにNOKをマッピングする最後の内部段階で不快な思い、再び結果を再マッピングが、それのための理由があります。 listOfPacketsの処理は、すべてのパケットが処理されたとき、またはStatus.NOKが見られるときに終了する必要があります。 Status.NOKをエラーインジケータに変換すると、これが実行されます。しかし、私たちは、個々のリストの処理を終了させ、観察可能なもの全体を終了させたくないだけです。

+0

flatMap()that shit!ありがとう、非常に助けてください。 – n1r3