2016-05-18 2 views
0

私はここから次のコードを使用しています - 私は次の呼び出しと、コードを変更する場合は、「リプレイキャッシュRxのキャッシュ - リプレイオペレータークリア

https://gist.github.com/leeoades/4115023

をクリアするには私には問題のように見えますこのように、Replayにバグがあることがわかります。つまり、決してクリアされません。誰かがこれを修正するために助けてくださいできますか?

private Cache<string> GetCalculator() 
    { 
     var calculation = Observable.Create<string>(o => 
     { 
      _calculationStartedCount++; 

      return Observable.Timer(_calculationDuration, _testScheduler) 
          .Select(_ => "Hello World!" + _calculationStartedCount) // suffixed the string with count to test the behaviour of Replay clearing 
          .Subscribe(o); 
     }); 

     return new Cache<string>(calculation); 
    } 

[Test] 
    public void After_Calling_GetResult_Calling_ClearResult_and_GetResult_should_perform_calculation_again() 
    { 
     // ARRANGE 
     var calculator = GetCalculator(); 

     calculator.GetValue().Subscribe(); 
     _testScheduler.Start(); 

     // ACT 
     calculator.Clear(); 

     string result = null; 
     calculator.GetValue().Subscribe(r => result = r); 
     _testScheduler.Start(); 

     // ASSERT 
     Assert.That(_calculationStartedCount, Is.EqualTo(2)); 
     Assert.That(result, Is.EqualTo("Hello World!2")); // always returns Hello World!1 and not Hello World!2 
     Assert.IsNotNull(result); 
    } 

答えて

2

問題は微妙です。ソースシーケンスTimerがイベントを発行すると完了し、Replayによって作成されたReplaySubjectの内部のOnCompletedが呼び出されます。 Subjectが完了すると、新しいObservableが表示されても、新しい値を受け入れなくなります。あなたは基礎となるObservableに再サブスクライブすると

それが再び実行されますが、ReplaySubjectが完了する前に、あなたの新しいObserverが唯一の最新の値を受け取ることができますので、Subjectを再起動することができません。

最も簡単な解決策は、おそらく(未テスト)完全なソースストリームをさせないように、次のようになります。

public Cache(IObservable<T> source) 
    { 
     //Not sure why you are wrapping this in an Observable.create 
     _source = source.Concat(Observable.Never()) 
          .Replay(1, Scheduler.Immediate); 
    }