2012-04-21 6 views
7

HttpWebRequestを使用して1つのアプリケーションサーバーに向けて並列要求を行う場合は、スロットルメカニズム(1秒あたりの要求数)を実装する必要があります。私のC#アプリは、リモートサーバに毎秒80リクエスト以上を発行してはならない。この制限は、リモートサービス管理者によって厳しい制限ではなく、私のプラットフォームとそのシステム間の「SLA」として課せられます。Webサーバーに向けて毎秒HttpWebRequestの数を制限する方法は?

HttpWebRequestを使用すると、1秒あたりのリクエスト数をどのように制御できますか?

答えて

3

私は同じ問題を抱えており、解決策を見つけることができませんでした。考え方はBlockingCollection<T>を使用して処理が必要なアイテムを追加し、リアクティブエクステンションを使用してレート制限プロセッサーを購読することです。

スロットルクラスはthis rate limiter

public static class BlockingCollectionExtensions 
{ 
    // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed) 
    public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken) 
    { 
     Subject<T> subject = new Subject<T>(); 

     // this is a dummyToken just so we can recreate the TokenSource 
     // which we will pass the proxy class so it can cancel the task 
     // on disposal 
     CancellationToken dummyToken = new CancellationToken(); 
     CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken); 

     var consumingTask = new Task(() => 
     { 
      using (var throttle = new Throttle(items, timePeriod)) 
      { 
       while (!sequence.IsCompleted) 
       { 
        try 
        { 
         T item = sequence.Take(producerToken); 
         throttle.WaitToProceed(); 
         try 
         { 
          subject.OnNext(item); 
         } 
         catch (Exception ex) 
         { 
          subject.OnError(ex); 
         } 
        } 
        catch (OperationCanceledException) 
        { 
         break; 
        } 
       } 
       subject.OnCompleted(); 
      } 
     }, TaskCreationOptions.LongRunning); 

     return new TaskAwareObservable<T>(subject, consumingTask, tokenSource); 
    } 

    private class TaskAwareObservable<T> : IObservable<T>, IDisposable 
    { 
     private readonly Task task; 
     private readonly Subject<T> subject; 
     private readonly CancellationTokenSource taskCancellationTokenSource; 

     public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource) 
     { 
      this.task = task; 
      this.subject = subject; 
      this.taskCancellationTokenSource = tokenSource; 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      var disposable = subject.Subscribe(observer); 
      if (task.Status == TaskStatus.Created) 
       task.Start(); 
      return disposable; 
     } 

     public void Dispose() 
     { 
      // cancel consumption and wait task to finish 
      taskCancellationTokenSource.Cancel(); 
      task.Wait(); 

      // dispose tokenSource and task 
      taskCancellationTokenSource.Dispose(); 
      task.Dispose(); 

      // dispose subject 
      subject.Dispose(); 
     } 
    } 
} 

ユニットテストの名前を変更したバージョンです:質問に

class BlockCollectionExtensionsTest 
{ 
    [Fact] 
    public void AsRateLimitedObservable() 
    { 
     const int maxItems = 1; // fix this to 1 to ease testing 
     TimeSpan during = TimeSpan.FromSeconds(1); 

     // populate collection 
     int[] items = new[] { 1, 2, 3, 4 }; 
     BlockingCollection<int> collection = new BlockingCollection<int>(); 
     foreach (var i in items) collection.Add(i); 
     collection.CompleteAdding(); 

     IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None); 
     BlockingCollection<int> processedItems = new BlockingCollection<int>(); 
     ManualResetEvent completed = new ManualResetEvent(false); 
     DateTime last = DateTime.UtcNow; 
     observable 
      // this is so we'll receive exceptions 
      .ObserveOn(new SynchronizationContext()) 
      .Subscribe(item => 
       { 
        if (item == 1) 
         last = DateTime.UtcNow; 
        else 
        { 
         TimeSpan diff = (DateTime.UtcNow - last); 
         last = DateTime.UtcNow; 

         Assert.InRange(diff.TotalMilliseconds, 
          during.TotalMilliseconds - 30, 
          during.TotalMilliseconds + 30); 
        } 
        processedItems.Add(item); 
       }, 
       () => completed.Set() 
      ); 
     completed.WaitOne(); 
     Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>()); 
    } 
} 
+0

URLに何か問題が発生しました –

-1

私の元の記事では、クライアントビヘイビア拡張を介してWCFにスロットリングメカニズムを追加する方法について説明しましたが、質問を誤解していると指摘されました(doh!)。

全体として、レート制限に違反しているかどうかを判断するクラスをチェックすることができます。料金違反を確認する方法については、すでに多くの議論があります。あなたはレート制限に違反している場合

Throttling method calls to M requests in N seconds

、その後、修正間隔寝て、もう一度確認してください。そうでない場合は、HttpWebRequest呼び出しを行います。

+0

、私は、WCFのWebサービスを参照するわけではありません。これは単純なHttpWebRequestクラスの使用方法です。 –

+0

ああ、それは遅れて、私はより密接に質問を読んでいるはずです:) HttpWebRequestを呼び出す前に、別のクラスで80リクエスト/秒のレートに違反しないことを確認してください。上記のコードを更新します。 –

+0

それはC#ではなくJavaを要求しました。 – SmallChess

0

Throttle()およびSample()拡張メソッド(On Observable)を使用すると、イベントの高速シーケンスを「より遅い」シーケンスに調整できます。

Here is a blog post with an exampleSample(Timespan)は、最大レートを保証します。

+0

Sample()とThrottle()の問題は、指定されたレートを達成するためにサンプルをスキップ/スローすることです。 – georgiosd

関連する問題