UDPソケットを聴く必要があります.10秒後またはバッファ内の100アイテム後に、いくつかのロジックを呼び出す必要があります。一般的には正常に動作しますが、リスニングソケットを正しく停止する方法はわかりません。RxExtensionsでUDPソケットをリッスンするのを止める適切な方法
var ip = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1234);
var socket = new UdpClient(ip);
var cancellationTokenSource = new CancellationTokenSource();
var observable =
Observable
.FromAsync(socket.ReceiveAsync)
.DoWhile(() => !cancellationTokenSource.IsCancellationRequested)
.Buffer(TimeSpan.FromSeconds(10), 100);
var subscribtion = observable.Subscribe(o =>
{
//logic
});
//simulate close method from another thread
Task.Factory.StartNew(() =>
{
Task.Delay(TimeSpan.FromSeconds(12)).Wait();
cancellationTokenSource.Cancel();
socket.Close();
subscribtion.Dispose();
});
私は閉じたソケットをシミュレートすると、処理できないバッファ内の一部のデータが存在する状況が存在するが - 、この動作を回避するために、どのような方法ですか?
私は500msの遅延で別のプロセスからいくつかのメッセージを送信し、それは以下の例のような作品になります。
- 20のメッセージが収入
- いくつかのロジックを呼び出すことになるだろう - 加入者のロジックを
- 4のメッセージ収入
- は「closeメソッドは、」起動された場合、closeメソッドが
を起動しますシミュレートします私はすぐにバッファタイムアウトを待つことなく、バッファとクローズアプリケーションのすべてのデータを処理する必要があります。バッファ遅延時間はユーザーによって定義されるため、かなり長い時間がかかる可能性があるため、invokeサブスクライバロジックを待つ必要はありません。
あなたは[mcve]を提供してもよろしいですか?私はあなたの問題を示すいくつかのコードを実行したいと思います。その後、修正することができます。私は解決策がTPLとRxを混ぜる**ではないと言っています。 Rxはより強力で、それはあなたが集中すべきものです。 – Enigmativity
ありがとうございます。たぶん私は私の質問を再質問し、Rxにもっと集中してみてください。私の例ではObservable with Bufferメソッドを使用しました。これは、10秒ごとにまたはバッファ内の100項目の加入者に通知します。しかし、私は誰かが私のアプリを閉じるときに特別な状況があります。それは指定された時間に発生する可能性がありますので、例えば、最後のサブスクライバからの3秒目が通知され、バッファ内にデータが存在する場合は、バッファからすべてのデータを処理するために7秒間アプリを保持する必要があります。 Observableクラス(要望に応じて)に言える方法はありますか?あなたのソースを停止し、最後にユーザーに通知します。 – tom
「あなたのソースを停止し、最後に加入者に知らせる」とは何を意味しますか? – Enigmativity