2016-11-24 10 views
0

UDPソケットを聴く必要があります.10秒後またはバッファ内の100アイテム後に、いくつかのロジックを呼び出す必要があります。一般的には正常に動作しますが、リスニングソケットを正しく停止する方法はわかりません。RxExtensionsでUDPソケットをリッスンするのを止める適切な方法

var ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234); 
var socket = new UdpClient(ip); 

var cancellationTokenSource = new CancellationTokenSource(); 
var observable = 
    Observable 
    .FromAsync(socket.ReceiveAsync) 
    .DoWhile(() => !cancellationTokenSource.IsCancellationRequested) 
    .Buffer(TimeSpan.FromSeconds(10), 100); 

var subscribtion = observable.Subscribe(o => 
{ 
    //logic 
}); 

//simulate close method from another thread 
Task.Factory.StartNew(() => 
{ 
    Task.Delay(TimeSpan.FromSeconds(12)).Wait(); 
    cancellationTokenSource.Cancel(); 
    socket.Close(); 
    subscribtion.Dispose(); 
}); 

私は閉じたソケットをシミュレートすると、処理できないバッファ内の一部のデータが存在する状況が存在するが - 、この動作を回避するために、どのような方法ですか?

私は500msの遅延で別のプロセスからいくつかのメッセージを送信し、それは以下の例のような作品になります。

  1. 20のメッセージが収入
  2. いくつかのロジックを呼び出すことになるだろう - 加入者のロジックを
  3. 4のメッセージ収入
  4. は「closeメソッドは、」起動された場合、closeメソッドが

を起動しますシミュレートします私はすぐにバッファタイムアウトを待つことなく、バッファとクローズアプリケーションのすべてのデータを処理する必要があります。バッファ遅延時間はユーザーによって定義されるため、かなり長い時間がかかる可能性があるため、invokeサブスクライバロジックを待つ必要はありません。

+0

あなたは[mcve]を提供してもよろしいですか?私はあなたの問題を示すいくつかのコードを実行したいと思います。その後、修正することができます。私は解決策がTPLとRxを混ぜる**ではないと言っています。 Rxはより強力で、それはあなたが集中すべきものです。 – Enigmativity

+0

ありがとうございます。たぶん私は私の質問を再質問し、Rxにもっと集中してみてください。私の例ではObservable with Bufferメソッドを使用しました。これは、10秒ごとにまたはバッファ内の100項目の加入者に通知します。しかし、私は誰かが私のアプリを閉じるときに特別な状況があります。それは指定された時間に発生する可能性がありますので、例えば、最後のサブスクライバからの3秒目が通知され、バッファ内にデータが存在する場合は、バッファからすべてのデータを処理するために7秒間アプリを保持する必要があります。 Observableクラス(要望に応じて)に言える方法はありますか?あなたのソースを停止し、最後にユーザーに通知します。 – tom

+0

「あなたのソースを停止し、最後に加入者に知らせる」とは何を意味しますか? – Enigmativity

答えて

1

ようこそStackOverflow!

バッファメソッドの既存のオーバーロードは、時間、カウント、ゲートをサポートしていません。しかし、シーケンスが値を生成するときにバッファークローズをトリガーするオーバーロードがあります。したがって、バッファクローズのためのすべての条件のオブザーバブルをマージしてシーケンスを作成するだけです。

このデモをご覧ください。

onTimeは、指定した期間の後に値を生成します。

onCountは、xアイテムが経過した後に最初の値を生成します。

onCloseはすぐにsubscribeに値を生成しますが、私たちが決定するまでは接続しません。

  var producer = Observable.Interval(TimeSpan.FromSeconds(0.2)); 
      var source = producer.Publish().RefCount(); 

      var onClose = Observable.Return(0L).Publish(); 
      var onTime = Observable.Timer(TimeSpan.FromSeconds(2)); 
      var onCount = source.Skip(10); 

      var bufferClose = Observable.Merge(onClose, onTime, onCount); 

     var subscription = 
      source 
      .Buffer(() => bufferClose) 
      .Subscribe(list => Console.WriteLine(string.Join(",", list))); 

      Console.WriteLine("Waiting for close"); 
      Console.ReadLine(); 

      onClose.Connect(); //signal 
      subscription.Dispose(); 

      Console.WriteLine("Closed"); 

      Console.ReadLine(); 

リターンが押されるまでは、それがすぐにバッファで利用できるものと閉じたときに、カウントまたは時間に基づいて出力を生成します。

+0

明日私は解決策を試してみるが、非常に有望に見える。ありがとうございました。 – tom

+0

@tomどのようになったのですか? – Asti

関連する問題