2017-10-28 9 views
0

私はRX.netで背圧問題を遭遇しましたが、解決策が見つかりません。私は観測可能なリアルタイムのログメッセージを持っています。私は、彼らがワイヤを介して送信される前に、logObservableからリアルタイムのログメッセージをシリアライズTCPインタフェースを介して公開するRX.netのDropQueueメカニズム

var logObservable = /* Observable stream of log messages */ 

。だから私は、次の手順を実行します

foreach (var message in logObservable.ToEnumerable()) 
{ 
    // 1. Serialize message 
    // 2. Send it over the wire. 
} 

背圧のシナリオは、例えば発生した場合、問題が発生して.ToEnumerable()相手側のクライアントがストリームを一時停止する場合問題は、.ToEnumerable()がメモリを大量に使用するアイテムをキャッシュすることです。私はDropQueueのようなものを探しています。最後の10個のメッセージをバッファリングするだけです。

var observableStream = logObservable.DropQueue(10).ToEnumerable(); 

これはこの問題を解決する正しい方法ですか?背圧の問題を避けるために、このような仕組みを実装することを知っていますか?

+0

'.take(10).toenumerable()'それはないだろう働くだろうか? –

+0

私は、連続したログメッセージのストリームを望みます。もし私があなたが提案したようにすれば、それは10のログメッセージを取って観察可能なストリームを完成させるだけではありませんか?私が解決しようとしている問題は、クライアントがログメッセージを取得するには時間がかかりすぎるか、ストリームだけをキャッシュする必要がある場合です。無制限数の項目の代わりに10項目。 – SOK

+0

'.Throttle(...)'や '.Sample(..)'はどうですか? – Enigmativity

答えて

0

マイDropQueue実装:

public static IEnumerable<TSource> ToDropQueue<TSource>(
     this IObservable<TSource> source, 
     int queueSize, 
     Action backPressureNotification = null, 
     CancellationToken token = default(CancellationToken)) 
    { 
     var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize); 
     var isBackPressureNotified = false; 

     var subscription = source.Subscribe(
      item => 
      { 
       var isBackPressure = queue.Count == queue.BoundedCapacity; 

       if (isBackPressure) 
       { 
        queue.Take(); // Dequeue an item to make space for the next one 

        // Fire back-pressure notification if defined 
        if (!isBackPressureNotified && backPressureNotification != null) 
        { 
         backPressureNotification(); 
         isBackPressureNotified = true; 
        } 
       } 
       else 
       { 
        isBackPressureNotified = false; 
       } 

       queue.Add(item); 
      }, 
      exception => queue.CompleteAdding(), 
      () => queue.CompleteAdding()); 

     token.Register(() => { subscription.Dispose(); }); 

     using (new CompositeDisposable(subscription, queue)) 
     { 
      foreach (var item in queue.GetConsumingEnumerable()) 
      { 
       yield return item; 
      } 
     } 
    } 
関連する問題