2017-06-18 11 views
1

私はrxを使い慣れていて、ドットネットでリアクティブエクステンションを使っていくつかのネットワークコードに取り組んでいます。私の問題は、非同期関数で作成したtcpClientのオブザーバブルが、私が供給しているトークンを介して取り消しをトリガするときに期待通りに完了していないことです。私はこれを実行すると、ローカルホストに接続する場合非同期関数で作成されたobservableを取り消す

public static class ListenerExtensions 
{ 
    public static IObservable<TcpClient> ToListenerObservable(
     this IPEndPoint endpoint, 
     int backlog) 
    { 
     return new TcpListener(endpoint).ToListenerObservable(backlog); 
    } 

    public static IObservable<TcpClient> ToListenerObservable(
     this TcpListener listener, 
     int backlog) 
    { 
     return Observable.Create<TcpClient>(async (observer, token) => 
     { 
      listener.Start(backlog); 

      try 
      { 
       while (!token.IsCancellationRequested) 
        observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token)); 
       //This never prints and onCompleted is never called. 
       Console.WriteLine("Completing.."); 
       observer.OnCompleted(); 
      } 
      catch (System.Exception error) 
      { 
       observer.OnError(error); 
      } 
      finally 
      { 
       //This is never executed and my progam exits without closing the listener. 
       Console.WriteLine("Stopping listener..."); 
       listener.Stop(); 
      } 
     }); 
    } 
} 
class Program 
{ 
    static void Main(string[] args) 
    { 
     var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323); 
     var cancellation = new CancellationTokenSource(); 

     home.ToListenerObservable(10) 
      .Subscribe(
       onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"), 
       onError: e => Console.WriteLine($"Error: {e.Message}"), 
       onCompleted:() => Console.WriteLine("Complete"), // Never happens 
       token: cancellation.Token); 

     Console.WriteLine("Any key to cancel"); 
     Console.ReadKey(); 
     cancellation.Cancel(); 
     Thread.Sleep(1000); 
    } 
} 

:ここで私はとの問題を抱えているコードの簡易版である2323年は、私が接続されているtcpClientsのシーケンスを取得することがわかります。しかし、私がcancelationトークンのキャンセルをトリガーすると、プログラムはリスナーを閉じずに、期待どおりにonCompletedイベントを出さずに終了します。私は間違って何をしていますか?

+0

このコードはキャンセルされるため、キャンセルすると 'OperationCancelledException'がスローされます。 –

+0

https://stackoverflow.com/questions/14524209/what-is-the-correct-way-to-cancel-an-async-operation-that-doesnt-accept-a-cance –

+0

そして、これを見ると[Stephen Clearyの投稿](http://blog.stephencleary.com/2015/03/a-tour-of-task-part-9-delegate-tasks.html「タスクツアー、第9回:タスクの委任」 )、あなたは 'Task.Run'にキャンセルトークンを渡すことは、あなたが思うかもしれないものとまったく同じではないことがわかります。 –

答えて

1

だから私はここでいくつかのことを混乱させた。 This articleが私の進路を助けました。実際、そこからコードをコピーしてしまいました。

public static class TaskCancellations 
{ 
    public static async Task<T> WithCancellation<T>(
     this Task<T> task, 
     CancellationToken token) 
    { 
     var cancellation = new TaskCompletionSource<bool>(); 
     using (token.Register(s => 
      (s as TaskCompletionSource<bool>).TrySetResult(true), cancellation)) 
     { 
      if (task != await Task.WhenAny(task, cancellation.Task)) 
       throw new OperationCanceledException(token); 
      return await task; 
     } 
    } 
} 

そして、このようなtcplistenerでそれを使用します。今、予想通り

public static IObservable<TcpClient> ToListenerObservable(
    this TcpListener listener, 
    int backlog) 
{ 
    return Observable.Create<TcpClient>(async (observer, token) => 
    { 
     listener.Start(backlog) 
     try 
     { 
      while (!token.IsCancellationRequested) 
      { 
       observer.OnNext(await listener.AcceptTcpClientAsync() 
        .WithCancellation(token)); 
      } 
     } 
     catch (OperationCanceledException) 
     { 
      Console.WriteLine("Completing..."); 
      observer.OnCompleted(); 
     } 
     catch (System.Exception error) 
     { 
      observer.OnError(error); 
     } 
     finally 
     { 
      Console.WriteLine("Stopping listener..."); 
      listener.Stop(); 
     } 
    }); 
} 

すべてが動作します。

+1

私は 'TcpListener' APIに精通していませんが、' using(token.Register(()=> listener.Stop())) 'のようなものを使うべきだと思います。 –

1

あまりにも多くのコードを書くことは避けてください。キャンセルトークンを使用してください。ここでは、標準のRxオペレータから離れることなく、あなたがやっていることをやり遂げる方法があります。私はこのコードを完全にテストすることができませんでしたので、少し微調整が必​​要な場合があります。ここで重要なことは、キャンセルトークンのように尋ねcompleterある

var query = Observable.Create<TcpClient>(o => 
{ 
    var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323); 
    var listener = new TcpListener(home); 
    listener.Start(); 
    return 
     Observable 
      .Defer(() => Observable.FromAsync(() => listener.AcceptTcpClientAsync())) 
      .Repeat() 
      .Subscribe(o); 
}); 

var completer = new Subject<Unit>(); 
var subscription = 
    query 
     .TakeUntil(completer) 
     .Subscribe(
      onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"), 
      onError: e => Console.WriteLine($"Error: {e.Message}"), 
      onCompleted:() => Console.WriteLine("Complete")); 

Console.WriteLine("Enter to cancel"); 
Console.ReadLine(); 
completer.OnNext(Unit.Default); 
Thread.Sleep(1000); 

はこれを試してみてください。それはOnCompletedコールなしで終了するsubscription.Dispose()とは異なり、自然にサブスクリプションを終了します。

+0

それは本当にクールです。私はこのように使い捨てを作った: 'var stop = Disposable.Create(>)listener.Stop());' 次に、CompositeDisposeを返します。今は、プログラムを終了する前にstop()を呼び出すことがどれほど重要であるかわかりません。最初は、直後に同じソケットにリスナーを起動できる必要があると思っていました。それは私が正しく理解することを保証しないことが判明しました。オペレーティングシステムは、プログラムが待機状態で終了してから最大で4分間開いたままにすることがあります。私はそれを終了した直後にリスナーを再実行すると、これは私に "使用中のアドレス"エラーを与えると思います。 –

関連する問題