2017-10-30 24 views
0

私は中断から再開できるネットワーククライアントを持っていますが、再試行があるときに最後のメッセージが必要です。 KotlinでRxJava2の再試行オペレータの状態を記憶する方法

例:

fun requestOrResume(last: Message? = null): Flowable<Message> = 
    Flowable.create({ emitter -> 
     val connection = if (last != null) 
          client.start() 
         else 
          client.resumeFrom(last.id) 

     while (!emitter.isDisposed) { 
      val msg = connection.nextMessage() 
      emitter.onNext(msg) 
     } 
    }, BackpressureStrategy.MISSING) 

requestOrResume() 
    .retryWhen { it.flatMap { Flowable.timer(5, SECONDS) } } 
    // how to pass the resume data when there is a retry? 

質問:あなたが見ることができるように、私は履歴書の呼び出しを準備するために、最後に受信したメッセージを必要としています。再試行があるときに再開要求を行うことができるように、どのように追跡することができますか?

可能な解決策の1つは、最後のメッセージへの参照を保持し、新しいメッセージを受信したときに更新されるホルダークラスを作成することです。これにより、再試行があるときに、最後のメッセージを所有者から得ることができる。例:

class MsgHolder(var last: Message? = null) 

fun request(): Flowable<Message> { 
    val holder = MsgHolder() 
    return Flowable.create({ emitter -> 
     val connection = if (holder.last != null) 
          client.start() 
         else 
          client.resumeFrom(holder.last.id) 

     while (!emitter.isDisposed) { 
      val msg = connection.nextMessage() 
      holder.last = msg // <-- update holder reference 
      emitter.onNext(msg) 
     } 
    }, BackpressureStrategy.MISSING) 
} 

これはうまくいくと思いますが、ハック(スレッド同期の問題?)のように感じます。

状態を追跡して再試行できる方法がありますか? link あなたはこのようにそれを使用することができます::

答えて

1

注オブジェクトあなたの最後の要素の周りのラッパー(既存の「ハック」とはあまりにも機能的ではありませんが、ソリューションは醜いですが)、エラー処理オペレーターは外部の助けなしに最後の要素を回復することはできません。 Throwable。代わりに、次の再帰的なアプローチは、あなたのニーズに合ったかどうかを確認:

fun retryWithLast(seed: Flowable<Message>): Flowable<Message> { 
    val last$ = seed.last().cache(); 
    return seed.onErrorResumeNext { 
    it.flatMap { 
     retryWithLast(last$.flatMap { 
     requestOrResume(it) 
     }) 
    } 
    }; 
} 
retryWithLast(requestOrResume()); 

の最大の違いは、むしろ値に手動で行うよりもcacheで観察して最後の試みから末尾の値をキャッシュしています。また、エラーハンドラの再帰手段retryWithLastは、その後の試行が失敗した場合でもストリームを拡張し続けることに注意してください。

0

buffer()オペレータに近い見てみましょう

requestOrResume() 
    .buffer(2) 

今から、あなたのFlowable 2 latestsのでList<Message>を返します。あなたが再スローしない限り、という

+0

この状況で 'buffer'演算子がどのように役立つのか分かりません。 – ESala

関連する問題