2010-12-22 1 views
2

クリーンアップ:反応性拡張機能は、次のようなRXを使用して通話の長鎖がある場合

var responses = collectionOfHttpRequests.ToObservable() 
.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse) 
.Select(res => res.GetResponseBodyString()) // Extension method to get the body of the request 
.Subscribe(); 

、その後の操作はあなたが処分を呼び出す完了する前に、HTTPリクエストがキャンセルされます、閉じをし、処分さ正しく、またはメソッドチェーンから何らかの形でhttprequestsを選択して個別に処分する必要がありますか?

私は、一度に複数のHTTPリクエストが発生する可能性があり、ネットワークトラフィックを節約するために一部またはすべてをキャンセル(無視しない)する必要があります。

+1

あなたのコードがいるようだが間違っている: 'IObservable <> 'で呼び出し可能な拡張メソッド' FromAsyncPattern'はありません。その方法は静的です。拡張ではありません。実際のコードを思いついて時間をかけると、問題と解決策の両方が明らかになります。 –

+0

@Fryodor - あなたは 'FromAsyncPattern'について全く正しいのですが、正しいメソッドシグニチャを使用するとOPの洞察をどのようにして非同期要求をキャンセルするのか分かりません。 –

答えて

2

Rx演算子チェーンは、シーケンスが完了するとエラーを起こすか、サブスクリプションが破棄されます。しかし、各オペレータは、クリーンアップする予定のものだけをクリーンアップします。たとえば、FromEventはイベントの登録を解除します。

キャンセルの場合、Begin/End asynchronous patternではキャンセルがサポートされていないため、キャンセルすることはありません。ただし、Finallyを使用してHttpWebRequest.Abortに電話することはできます。

var observableRequests = collectionOfHttpRequests.ToObservable(); 

var responses = observableRequests 
    .SelectMany(req => 
     Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)() 
    ) 
    .Select(resp => resp.GetResponseBodyString()) 
    .Finally(() => 
    { 
     observableRequests 
      .Subscribe(req => req.Abort()); 
    }) 
    .Subscribe(); 
2

私はRichard Szalayの解決策を受け入れることはできません。 100の要求を開始し、2番目の要求がサーバー使用不能エラーで失敗した場合(たとえば、残りの98個の要求が中止された場合など) 2番目の問題は、UIがそのような観測可能性にどのように反応するかです。あまりにも悲しい。

Rx Design Guidelinesの4.3章を覚えておいて、私はObservable.Using()演算子でWebRequestを観測可能と表現したいと考えました。しかしWebRequestは使い捨てではありません!だから私はDisposableWebRequest定義:

public class DisposableWebRequest : WebRequest, IDisposable 
{ 
    private static int _Counter = 0; 

    private readonly WebRequest _request; 
    private readonly int _index; 

    private volatile bool _disposed = false; 

    public DisposableWebRequest (string url) 
    { 
     this._request = WebRequest.Create(url); 
     this._index = ++DisposableWebRequest._Counter; 
    } 

    public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state) 
    { 
     return this._request.BeginGetResponse(callback, state); 
    } 

    public override WebResponse EndGetResponse(IAsyncResult asyncResult) 
    { 
     Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId)); 
     Trace.Flush(); 
     if (!this._disposed) 
     { 
      return this._request.EndGetResponse(asyncResult); 
     } 
     else 
     { 
      return null; 
     } 
    } 

    public override WebResponse GetResponse() 
    { 
     return this._request.GetResponse(); 
    } 

    public override void Abort() 
    { 
     this._request.Abort(); 
    } 

    public void Dispose() 
    { 
     if(!this._disposed) 
     { 
      this._disposed = true; 

      Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId)); 
      Trace.Flush(); 
      this.Abort(); 
     } 
    } 
} 

それから私は、WPFウィンドウを作成し、(開始と停止)その上の2つのボタンを置きます。

だから、適切なリクエストオブザーバブルコレクションを作成しましょう。まず 、URLの観察可能な関数を作成定義:

 Func<IObservable<string>> createUrlObservable =() => 
      Observable 
       .Return("http://yahoo.com") 
       .Repeat(100) 
       .OnStartup(() => 
       { 
        this._failed = 0; 
        this._successed = 0; 
       }); 

私たちは、obervable WebRequestクラスを作成する必要があり、すべてのURLを:

 Func<string, IObservable<WebResponse>> createRequestObservable = 
      url => 
      Observable.Using(
       () => new DisposableWebRequest("http://yahoo.com"), 
       r => 
       { 
        Trace.WriteLine("Queued " + url); 
        Trace.Flush(); 
        return Observable 
         .FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)(); 
       }); 

またボタンに反応する2つのイベントの観測「スタート」/」を定義しますクリック「停止:

 var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click"); 
     var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click"); 

だから、レンガている準備ができて、時間がそれらを構成する(私は直後ビューコンストラクタでそれを行うのInitializeComponent()):

 startMouseDown 
      .SelectMany(down => 
       createUrlObservable() 
        .SelectMany(url => createRequestObservable(url) 
         .TakeUntil(stopMouseDown) 
         .Select(r => r.GetResponseStream()) 
         .Do(s => 
          { 
           using (var sr = new StreamReader(s)) 
           { 
            Trace.WriteLine(sr.ReadLine()); 
            Trace.Flush(); 
           } 

          }) 
         .Do(r => this._successed++) 
         .HandleError(e => this._failed++)) 
         .ObserveOnDispatcher() 
         .Do(_ => this.RefresLabels(), 
          e => { }, 
          () => this.RefresLabels()) 

         ) 
      .Subscribe(); 

関数 "HandleError()"に不思議に思うかもしれません。 EndGetResponse()呼び出しで例外が発生した場合(私はそれを再現するためにネットワーク接続をオフにしました)、要求に応じてキャッチされず、観測可能である場合、startMouseDown observableがクラッシュします。HandleErrorは、黙って、例外をキャッチするアクションを提供perfom、代わりに次の観測のためのコールONERRORのそれはOnCompleted呼び出します

public static class ObservableExtensions 
{ 
    public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler) 
    { 
     return Observable.CreateWithDisposable<TSource>(observer => 
      { 
       return source.Subscribe(observer.OnNext, 
        e => 
        { 
         errorHandler(e); 
         //observer.OnError(e); 
         observer.OnCompleted(); 
        }, 
        observer.OnCompleted); 
      }); 
    } 
} 

最後原因不明のところがされているメソッドのRefreshLabels、UIコントロール更新:

private void RefresLabels() 
    { 
     this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed); 
     this.FailedLabel.Content = string.Format("Failed {0}", this._failed); 
    } 
関連する問題