2016-11-28 18 views
0

私の質問は、Rxとキャッチ演算子に関係します。私は観測可能でタイムアウトがあり、タイムアウトが発生するたびに根底にある観測可能なもの(Catch)を再作成し、同じことを行う(タイムアウトとキャッチを追加する)とします。反応抑制の無限のキャッチ

以下にサンプルコードを貼り付けました。この例では、タイムアウトは2秒ごとに発生します。私の観察から、このコードは無限に働くことはできません。何らかの形でレクリエーションの後、何かが古い観測可能な残余物を参照しています。これらの残余は、キャッチが呼び出されるたびに蓄積されます。

ほとんどの疑わしい行が最後のものであり、何らかの自己参照が存在します。しかし、なぜそれが間違っているのか、私は実際に自分自身を視覚化することはできません。また、永遠に動作する同様のロジックで観測可能なものを作成する方法はありますか?

public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return recreateObservable() 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable)); 
    } 
+1

ように見える終わるかもしれないあなたのコードは、これは.NETの質問やJavaの質問ですか?私はrx-javaタグで混乱しています... –

+0

これはrx-netとrx-javaがうまくいくのでRx質問です。コード例はC# –

答えて

1

Retry()演算子を使用したいと思います。

あなたの最初のシーケンスはあなたの継続シーケンスと同じであると仮定します。

Observable.Return(1).Concat(Observable.Throw<int>(new Exception())) 
    .Retry() 

厳密な無限ループで実行されます。

createObservable() 
    .Timeout(TimeSpan.FromSeconds(2)) 
    .Retry() 
0

このようなことが可能です。私もこのアプローチのファンではない

Next: sourceAbc 
Next: sourceDef 
Next: backupHij 
Next: backupLmn 

//done in Linqpad, where async Main is allowed. 
async void Main() 
{ 
    var source = new Subject<string>(); 
    var backup = new Subject<string>(); 
    var reliableStream = source.CreateReliableStream(() => backup); 
    reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"),() => Console.WriteLine("Completed.")); 

    source.OnNext("sourceAbc"); 
    backup.OnNext("backupAbc"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    source.OnNext("sourceDef"); 
    backup.OnNext("backupDef"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    //Doesn't yield "Completed" because it's re-subscribing. 
    source.OnCompleted(); 
    backup.OnCompleted(); 

} 

public static class Ex 
{ 
    public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f) 
    { 
     while(true) 
      yield return f(); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return InfiniteObservables(recreateObservable) 
      .Select(o => o.Timeout(TimeSpan.FromSeconds(2))) 
      .OnErrorResumeNext(); 
    } 
} 

は、次のような出力が得られます。 Rxはストリームターミネータのようなエラーを扱い、あなたはそれらを代替メッセージのように扱おうとしています。あなたはこのように川を泳ぐことになります。

関連する問題