2017-02-19 5 views
1

配列を持っていますが、値の型は無関係です。私がしたいのは、x秒ごとに1つの値を出すことです。その値を持つ関数を呼び出すと、その関数が何らかの理由で失敗した場合は、y秒後に再試行します(単純な定数、ここではインクリメンタルなものは必要ありません) 。私はここにかかわらず、これまでRxJSはx秒ごとに配列から値を出力し、その値を持つ関数を呼び出し、失敗した場合は再試行します。

Rx.Observable 
    .interval(500) 
    .take(arr.length) 
    .map(idx => arr[idx]) 
    .flatMap(dt => randomFunc(dt)) 
    .catch(e => conosle.log(e)) 
    .retry(5) 
    .subscribe(); 

function randomFunc(dt) { 
    return Rx.Observable.create(observer => { 
     if (dt === 'random') { 
      return observer.error(`error`); 
     } else { 
      return observer.next(); 
     } 
    }); 
} 

2の問題点を持っている何

1:randomFuncがエラーを返した場合、それは全体のチェーンをやり直すようです。私は失敗した人だけが再試行する必要があります。

2:catchエラーで再試行したように見えますが、実際にはエラーは記録されません。唯一失敗したものを再試行することを見えたが、それでもすべてのエラーを記録し、私はしませんでした

Rx.Observable 
    .interval(500) 
    .take(arr.length) 
    .map(idx => arr[idx]) 
    .switchMap(dt => randomFunc(dt) 
     .catch(e => conosle.log(e)) 
     .retry(5) 
    ) 
    .subscribe(); 

この方法:私はこのようなswitchMapの代わりflatMapを試みた最初の問題については

確かにswitchMapはここで良いです(私は本当にRXのnoobです)。

ありがとうございます、ありがとうございます!

+1

コンソールのスペルが間違っているため、「キャッチ」がログに記録されない可能性があります:-) conosle => console – Ashley

答えて

1

を物事のカップルが知っておくためにそこにいます。 retry()オペレータはソースに再登録するだけで、繰り返し全体を開始したくない場合は、async関数をチェーンにマージ/連結することができます。

Rx.Observable.from(arr) 
    .concatMap(val => { 
    let attempts = 0; 

    return Rx.Observable.of(val) 
     .delay(500) 
     .concatMap(val => randomFunc(val) 
     .catch((err, caught) => { 
      console.log('log error'); 
      if (attempts++ === 1) { 
      return Rx.Observable.of(err); 
      } else { 
      return caught; 
      } 
     }) 
    ); 

    }) 
    .subscribe(val => console.log(val)); 

function randomFunc(dt) { 
    return Rx.Observable.create(observer => { 
    if (dt === 'random') { 
     observer.error(`error received ${dt}`); 
    } else { 
     observer.next(dt); 
     observer.complete(); 
    } 
    }); 
} 

はライブデモを参照してください:https://jsbin.com/qacamab/7/edit?js,console

はこれがコンソールに出力します。

1 
2 
3 
4 
log error 
log error 
error received random 
6 
7 
8 
9 
10 

catch()オペレータが最も重要な部分です。 -

  • caughtを発生したエラー - 元観察可能

    • err:そのセレクタ機能は2つの引数を取ります。

    我々はセレクタ機能からcaughtを返す場合、我々は単に(​​と同じである)観測可能なソースに再サブスクライブします。それぞれのエラーメッセージを記録したいので、retry()の代わりにcatch()を使用する必要があります。 Rx.Observable.of(err)を返すことで、エラーがさらに伝播し、それがサブスクライバによってnext通知として受信されます。 Observable.empty()を返すだけでエラーを無視することもできます。

  • 1

    全体のチェーンが をやり直すようだエラーを返しますrandomFunc。再試行に失敗したものだけが必要です。

    Observablesを組み合わせると、エラーも伝搬され、キャッチされないエラーはサブスクリプションを解除します。

    switchMapの中にキャッチを使用するあなたの考えは正しいです。 switchMapだけ一度に観察可能 1を平らにしますが、次の値がマッピングされている場合でも、

    // Observable from array 
    Rx.Observable.from(arr) 
        .concatMap(value => 
         // Put a 500 ms delay between each value 
         Rx.Observable.timer(500).map(_ => value) 
        ) 
        .flatMap(dt => 
         randomFunc(dt) 
         .retryWhen(errs => 
          errs 
          .do(err => console.error(err)) 
          // Retry at most 5 times 
          .take(5) 
          // Retry after 500ms 
          .delay(500) 
         ) 
        ) 
        .subscribe(); 
    

    キャッチが実際にエラーを記録したことがない以前の観測が解除されます(それはを切り替えています) エラーで再試行しているようですが。

    キャッチするために渡された関数が観測例えば返す必要があります:

    Observable.throw(new Error()) 
        .catch(e => 
         (console.error(e), Observable.of('backup value')) 
        ) 
        .subscribe(); 
    

    http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-catch

    関連する問題