2011-01-12 9 views
25

でマルチスレッド私はリスナーがあります。純HttpListenerを

listener = new HttpListener(); 
listener.Prefixes.Add(@"http://+:8077/"); 
listener.Start(); 
listenerThread = new Thread(HandleRequests); 
listenerThread.Start(); 

を私は要求を処理しています:、私は、このような方法でvoid Stop()を書きたい

private void HandleRequests() 
{ 
    while (listener.IsListening) 
    { 
     var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener); 
     context.AsyncWaitHandle.WaitOne(); 
    } 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 
} 

こと:

  1. 現在処理されているすべてのリクエストが終了するまでブロックされます(つまり、すべてのスレッドが「何かをする」のを待つ)。
  2. 既に開始されているリクエストを待つ間、それ以上のリクエストは許可されません(すなわち、ListenerCallbackの先頭に戻る)。
  3. その後、listener.Stop()listener.IsListeningはfalseになります)を呼び出します。

どのように書き込むことができますか?

EDIT:この解決策についてどう思いますか?それは安全ですか?

public void Stop() 
{ 
    lock (this) 
    { 
     isStopping = true; 
    } 
    resetEvent.WaitOne(); //initially set to true 
    listener.Stop(); 
} 

private void ListenerCallback(IAsyncResult ar) 
{ 
    lock (this) 
    { 
     if (isStopping) 
      return; 

     resetEvent.Reset(); 
     numberOfRequests++; 
    } 

    var listener = ar.AsyncState as HttpListener; 

    var context = listener.EndGetContext(ar); 

    //do some stuff 

    lock (this) 
    { 
     if (--numberOfRequests == 0) 
      resetEvent.Set(); 
    } 
} 

答えて

2

私は編集で私のコードを相談してきました私の質問の一部であり、私はいくつかの変更を加えてそれを受け入れることに決めました:

0

単純にlistener.Stop()を呼び出すと、このトリックを行う必要があります。これにより、すでに確立されている接続は終了しませんが、新しい接続ができなくなります。

+1

これは当てはまりません。 'ListenerCallback'の実行中に' listener.Stop() 'を呼び出すと例外が発生します。 'EndGetContext'を呼び出すとき、または出力ストリームを使用するときにはそれを後で呼び出します。私はもちろん、例外をキャッチすることができますが、私はしたくないです。 – prostynick

+0

私のコードでは、私はフラグを使用し、それを停止した後にはリスナーをもう参照しませんが、リスナーを閉じると、受け入れられた接続は閉じず、リスナーだけを閉じません。 –

+0

「私は旗を使う」と言ってどういう意味なのか分かりません。問題は、 'ListenerCallback'でリスナーを使用していて、別のスレッドが閉じていると、私が使用している間に、私が言及した例外に終わることになります。 – prostynick

4

これを解決するいくつかの方法があります...これは、進行中の作業を追跡するためのセマフォと、すべての作業者が終了したときに発生するシグナルを使用する単純な例です。これはあなたに働く基本的な考えを与えるはずです。

以下の解決策は理想的ではありません。理想的にはBeginGetContextを呼び出す前にセマフォを取得する必要があります。そのため、シャットダウンがより困難になるので、私はこのより簡略化したアプローチを採用しました。私が '本当の'のためにこれをやっていたのなら、おそらくThreadPoolに頼るのではなく、私自身のスレッド管理を書くだろう。これにより、より信頼性の高いシャットダウンが可能になります。ここでは、完全性のために

class TestHttp 
{ 
    static void Main() 
    { 
     using (HttpServer srvr = new HttpServer(5)) 
     { 
      srvr.Start(8085); 
      Console.WriteLine("Press [Enter] to quit."); 
      Console.ReadLine(); 
     } 
    } 
} 


class HttpServer : IDisposable 
{ 
    private readonly int _maxThreads; 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly ManualResetEvent _stop, _idle; 
    private readonly Semaphore _busy; 

    public HttpServer(int maxThreads) 
    { 
     _maxThreads = maxThreads; 
     _stop = new ManualResetEvent(false); 
     _idle = new ManualResetEvent(false); 
     _busy = new Semaphore(maxThreads, maxThreads); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     _idle.Reset(); 

     //aquire and release the semaphore to see if anyone is running, wait for idle if they are. 
     _busy.WaitOne(); 
     if(_maxThreads != 1 + _busy.Release()) 
      _idle.WaitOne(); 

     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ListenerCallback, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ListenerCallback(IAsyncResult ar) 
    { 
     _busy.WaitOne(); 
     try 
     { 
      HttpListenerContext context; 
      try 
      { context = _listener.EndGetContext(ar); } 
      catch (HttpListenerException) 
      { return; } 

      if (_stop.WaitOne(0, false)) 
       return; 

      Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl); 
      context.Response.SendChunked = true; 
      using (TextWriter tw = new StreamWriter(context.Response.OutputStream)) 
      { 
       tw.WriteLine("<html><body><h1>Hello World</h1>"); 
       for (int i = 0; i < 5; i++) 
       { 
        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now); 
        tw.Flush(); 
        Thread.Sleep(1000); 
       } 
       tw.WriteLine("</body></html>"); 
      } 
     } 
     finally 
     { 
      if (_maxThreads == 1 + _busy.Release()) 
       _idle.Set(); 
     } 
    } 
} 
56

独自のワーカースレッドを管理する場合、それがどのように見えるかです::とにかくここ

は完全な例である

class HttpServer : IDisposable 
{ 
    private readonly HttpListener _listener; 
    private readonly Thread _listenerThread; 
    private readonly Thread[] _workers; 
    private readonly ManualResetEvent _stop, _ready; 
    private Queue<HttpListenerContext> _queue; 

    public HttpServer(int maxThreads) 
    { 
     _workers = new Thread[maxThreads]; 
     _queue = new Queue<HttpListenerContext>(); 
     _stop = new ManualResetEvent(false); 
     _ready = new ManualResetEvent(false); 
     _listener = new HttpListener(); 
     _listenerThread = new Thread(HandleRequests); 
    } 

    public void Start(int port) 
    { 
     _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port)); 
     _listener.Start(); 
     _listenerThread.Start(); 

     for (int i = 0; i < _workers.Length; i++) 
     { 
      _workers[i] = new Thread(Worker); 
      _workers[i].Start(); 
     } 
    } 

    public void Dispose() 
    { Stop(); } 

    public void Stop() 
    { 
     _stop.Set(); 
     _listenerThread.Join(); 
     foreach (Thread worker in _workers) 
      worker.Join(); 
     _listener.Stop(); 
    } 

    private void HandleRequests() 
    { 
     while (_listener.IsListening) 
     { 
      var context = _listener.BeginGetContext(ContextReady, null); 

      if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle })) 
       return; 
     } 
    } 

    private void ContextReady(IAsyncResult ar) 
    { 
     try 
     { 
      lock (_queue) 
      { 
       _queue.Enqueue(_listener.EndGetContext(ar)); 
       _ready.Set(); 
      } 
     } 
     catch { return; } 
    } 

    private void Worker() 
    { 
     WaitHandle[] wait = new[] { _ready, _stop }; 
     while (0 == WaitHandle.WaitAny(wait)) 
     { 
      HttpListenerContext context; 
      lock (_queue) 
      { 
       if (_queue.Count > 0) 
        context = _queue.Dequeue(); 
       else 
       { 
        _ready.Reset(); 
        continue; 
       } 
      } 

      try { ProcessRequest(context); } 
      catch (Exception e) { Console.Error.WriteLine(e); } 
     } 
    } 

    public event Action<HttpListenerContext> ProcessRequest; 
} 
+0

これはすばらしいことです。これは、HttpListenerのスループットをテストするための素晴らしい候補となります。 – Jonno

+0

ありがとうございました! 2つの小さな問題があります。1. ProcessRequestがnullの可能性があります。2. HttpListenerContextは静的でなければスレッドセーフではありません。 –

+0

@MartinMeeserがコメントに感謝します。 try catchブロックにラップするのではなく、この 'ProcessRequest?.Invoke(context);'を使うことができます。しかし、もし静的がオプションではないなら、何をお勧めしますか? – JohnTube

0

これはBlockingCollection型付きキューを使用して要求を処理します。そのまま使えます。このクラスからクラスを派生させ、Responseをオーバーライドする必要があります。

using System; 
using System.Collections.Concurrent; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class HttpServer : IDisposable 
    { 
     private HttpListener httpListener; 
     private Thread listenerLoop; 
     private Thread[] requestProcessors; 
     private BlockingCollection<HttpListenerContext> messages; 

     public HttpServer(int threadCount) 
     { 
      requestProcessors = new Thread[threadCount]; 
      messages = new BlockingCollection<HttpListenerContext>(); 
      httpListener = new HttpListener(); 
     } 

     public virtual int Port { get; set; } = 80; 

     public virtual string[] Prefixes 
     { 
      get { return new string[] {string.Format(@"http://+:{0}/", Port)}; } 
     } 

     public void Start(int port) 
     { 
      listenerLoop = new Thread(HandleRequests); 

      foreach(string prefix in Prefixes) httpListener.Prefixes.Add(prefix); 

      listenerLoop.Start(); 

      for (int i = 0; i < requestProcessors.Length; i++) 
      { 
       requestProcessors[i] = StartProcessor(i, messages); 
      } 
     } 

     public void Dispose() { Stop(); } 

     public void Stop() 
     { 
      messages.CompleteAdding(); 

      foreach (Thread worker in requestProcessors) worker.Join(); 

      httpListener.Stop(); 
      listenerLoop.Join(); 
     } 

     private void HandleRequests() 
     { 
      httpListener.Start(); 
      try 
      { 
       while (httpListener.IsListening) 
       { 
        Console.WriteLine("The Linstener Is Listening!"); 
        HttpListenerContext context = httpListener.GetContext(); 

        messages.Add(context); 
        Console.WriteLine("The Linstener has added a message!"); 
       } 
      } 
      catch(Exception e) 
      { 
       Console.WriteLine (e.Message); 
      } 
     } 

     private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Thread thread = new Thread(() => Processor(number, messages)); 
      thread.Start(); 
      return thread; 
     } 

     private void Processor(int number, BlockingCollection<HttpListenerContext> messages) 
     { 
      Console.WriteLine ("Processor {0} started.", number); 
      try 
      { 
       for (;;) 
       { 
        Console.WriteLine ("Processor {0} awoken.", number); 
        HttpListenerContext context = messages.Take(); 
        Console.WriteLine ("Processor {0} dequeued message.", number); 
        Response (context); 
       } 
      } catch { } 

      Console.WriteLine ("Processor {0} terminated.", number); 
     } 

     public virtual void Response(HttpListenerContext context) 
     { 
      SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>")); 
     } 

     public static void SendReply(HttpListenerContext context, StringBuilder responseString) 
     { 
      byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString()); 
      context.Response.ContentLength64 = buffer.Length; 
      System.IO.Stream output = context.Response.OutputStream; 
      output.Write(buffer, 0, buffer.Length); 
      output.Close(); 
     } 
    } 
} 

これは使用方法のサンプルです。イベントやロックブロックを使用する必要はありません。 BlockingCollectionはこれらの問題をすべて解決します。

using System; 
using System.Collections.Concurrent; 
using System.IO; 
using System.Net; 
using System.Text; 
using System.Threading; 

namespace Service 
{ 
    class Server 
    { 
    public static void Main (string[] args) 
    { 
     HttpServer Service = new QuizzServer (8); 
     Service.Start (80); 
     for (bool coninute = true; coninute ;) 
     { 
      string input = Console.ReadLine().ToLower(); 
      switch (input) 
      { 
       case "stop": 
        Console.WriteLine ("Stop command accepted."); 
        Service.Stop(); 
        coninute = false; 
        break; 
       default: 
        Console.WriteLine ("Unknown Command: '{0}'.",input); 
        break; 
      } 
     } 
    } 
    } 
} 
関連する問題