2016-04-07 4 views
0

回復力演算子で使用したときのサブジェクトの予想される動作、つまりretryretryWhenをよりよく理解したいと考えています。 4.0.7サブジェクトと復元力の論理ワークフロー

-

次のサンプルコードは、私は、これはバージョン4.0.0を使用して、容易に消費するための矢印機能とタイプを使用したことで(例えば、リンクに見られる)はJSBin対応物とは若干異なります私の予想弾力性の挙動は、以下のexampleで表現することができます。

Rx.Observable 
    .interval(1000) 
    .flatMap((count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count); 
    }) 
    .retry() 
    .take(5); 

Output 
// 0 
// 1 
// 2 
// 3 
// 0 <-- Retry means we start again from scratch (expected) 

この時点ですべてがエラーが(ステートレス建築家のために勝つ第四通知に最初からストリーム全体の再起動を発生した後つまり、一貫性があるまでure)。

我々は、マルチキャスト演算子を追加し、そうすることで基本的な件名(1のバッファと私の場合はReplaySubject)を追加した場合さてexample、傷私の頭の部分が来る:

const consumer : Rx.Observable<number> = Rx.Observable 
    .interval(1000) 
    .flatMap((count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count); 
    }) 
    .shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */ 
    .retry() 
    .take(5); 

const firstSubscriber : Rx.Disposable = consumer.subscribe((next:number) => { 
    console.log('first subscriber: ' + next); 
}); 

setTimeout(() => { 
    firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */ 
    const secondSubscriber : Rx.Disposable = consumer.subscribe((next) => { 
     console.log('second subscriber: ' + next); 
    }); 
}, 5000); 

Output (before error is thrown) 
// "first subscriber: 0" 
// "first subscriber: 1" 
// "first subscriber: 2" 
// "first subscriber: 3" 
Output (after error is thrown) 
// "first subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 

簡単に見Subjectにエラーが発生すると、被験者はinErrorとしてマークされ、将来の各サブスクライバは最後の通知(Line 46)を取得し、onErrorの呼び出しが直ちに行われます(Line 50)。

だからどこが私たちを離れるのでしょうか?私の意見では、Subject(shareReplay、publish etc ...)を含む他の演算子の後に弾力性演算子を使用することはできません。

このデザインで成功する唯一の方法は、エラーが発生してノードが破棄されたときに、サブジェクトが使用されるたびに新しいものを作成する必要があることを保証することですウサギの穴が始まります)?あなたがいうだけでしょうsubjectSelectorが呼び出されるそれぞれの新しいサブスクリプションの直接の対象に渡して、新しいConnectableObservableよりもsubjectSelectorを使用する場合sourceを見てみると

.multicast(() => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source); 

multicastは、工場/ subjectSelectorを取ることができます作成される(11行目)。

この時点で、サブジェクトの共有(あるキャッシュ経由)と廃棄(エラーが検出された場合)が実際にサブスクライバにマルチキャストを与えるかどうかはわかりませんか?

この時点までに、私はDisovering時にエラー状態を取り除いたRecoverableReplaySubjectも作成しました。これはテストのためのもので、RxJSチームがこのワークフローを正当な理由で入れることを期待していました。

このトピックについてのガイダンスと経験をお待ちしております。それはエラーと完了するために来るとき

おかげ

答えて

0

shareReplay被験者は異なる意味を保持します。例えば、基になる観測値が完了しても(refCount == 0)、shareReplayは完了しないので、それをさらに呼び出すと過去の値が生成(再生)されます。 Cf。 jsbin(shareReplay)対jsbin(シェア)

  • https://github.com/ReactiveX/rxjs/issues/1110
  • https://github.com/ReactiveX/rxjs/issues/453(これは長い議論です:

    var source = Rx.Observable 
         .interval(100) 
         .take(5) 
         .shareReplay() 
    
    var first = source.subscribe(function(next) { 
        console.log('first subscriber: ' + next); 
    }); 
    
    setTimeout(function() { 
    // first.dispose(); 
        var second = source.subscribe(function(next) { 
        console.log('second subscriber: ' + next); 
    }); 
    
    }, 1000); 
    

    そうしないと、あなたはそこ(あなたの問題の議論と)shareReplayの行動について説明対他の演算子があります、jhusain commented on Nov 4, 2015から始まる)

解決策は、工場の楽しみマルチキャストオペレータのために。いずれにしても、あなたの新しいデザインを試してみるのは難しいことではありません。

関連する問題