2017-05-15 4 views
1

エラー時に加入者を変更するには?私はデータベースからのコールドストリームを消費しています。次のケースを参照してください。エラーの観測可能な加入者の変更

return coctailBundleStream 
     .doOnNext(c -> { 
      hostnames.add(c.get(KEY_HOSTNAME)); // [A] 

      sendToOutboundQueue(c.get(KEY_CREDS)); 
      archiveSentMessage(c.get(KEY_CREDS), c.get(KEY_MESSAGE_ID)); 
     }) 
     .doOnComplete(this::saveCutOffTime) 
     .doOnError(e -> informUserImpactedHostnames(hostnames, 
      theRestOfHostnamesInside(credsXmlStream, e))) // I don't think this is right 
     .onErrorResumeNext(Flowable.empty()) 
     .count(); 

障害の影響を受けるすべてのホスト名を送信します。しかし、上記の私のコメントを参照してください。ストリームが2度消されるので、これは正しいとは思わない。 theRestOfHostnamesInsideの実装では、私は思いますcredsStream.map(c -> c.getHostname()), e)

ある場合たとえば、理想的には、エラーハンドラは、前のリストにリストを追加するリストにホスト名の残りの部分を抽出し、別のサブスクリプションを使用してストリームを継続すべきである(参照ラインがマークとともに])。

答えて

0

onErrorResumeNextを使用して、フォールバックするフローを提供する必要があります。

しかし、主な難点は重複を避けることです。あなたが言うようにソースが冷たい場合は、DBリクエストを再実行し、最初に購読されたシーケンスがエラーの前に何らかのデータを出した場合、同じデータが再度放出されます。

onErrorResumeNextの後にdistinctを連鎖することで軽減できます(重複を検出する方法を示すにはkeySelectorを指定する必要があります)。しかし、ソースからの2つの要素を重複としてマークしない基準を使用するようにしなければなりません(再試行によって作成された重複を排除するためです)。その周り

他の方法は、キーがまだない...コレクションの中で自分自身を処理し、onErrorResumeNextでこれらを除外していますが、確かコレクションはcount()の下流に作られた各subscribeに固有であることを特徴とする確認する必要があります格納することですそれは簡単です。

+0

私は同じDBリクエストをやり直すという考えが嫌いです。私にとっては効率的ではないようです。 – sancho21

+0

元のエラーの内容によって異なりますが、欠落したキーだけをクエリする2番目のシーケンスにフォールバックすることができます。エラーを起こしたシーケンス(存在する場合)のデータとフォールバックのデータを連結しますシーケンス、これは請求書に合う –

0

あなただけflatMap flatMapの内側だから、

 Observable.fromIterable(yourList) 
      .flatMap(x ->{ 
       Observable.just(x) 
         .map(data -> yourNormalSave(data)) 
         .onErrorReturn(errorResult) 
      }) 
      .subscribe(result ->{ 
         if(result != errorResult) 
          count++; 
         eles{ 
         //error received here 
         } 
      } 
      ); 

の内側にそれを行うことができ、あなたがエラーを満たしていれば、それは通常のタイプにそれを変更するが、下流のそれのないアイデアを持っていないでしょう。したがって、ダウンストリームの加入者はonNextでそれらを消費します。 x -> Observable.just(x)の代わりにそれを正しく行うことができます。

関連する問題