2009-08-20 2 views
1

一つの方法は、プロセス間(名前付き)パイプを介してです。がたTextWriterを介して通信 - プロセス間通信を有することが>たTextReader

は、私は2つのスレッド間通信様と同じ「キュー」をachiveたいです。プロデューサは、テキストベースコマンド(TextWriterまたは出力ストリームを使用)を記述する必要があります。 コンシューマはTextReaderから読み取る必要があります。だからあなたはそれについて考えるので、OutputStream/-WriterはInputStream/-Readerのコインのもう片側です。だから、Writerを使ってReaderをデータで埋めることは、理論的には簡単なはずです。

(ここで標準的な方法はスレッド間にキューを置くことですが、私はTextReaderとTextWriterを使用したいと思います。フロントエンドとバックエンドの既存のコードがすでに存在するからです。プロデューサ/コンシューマへ/ Console.Out .IN。)

私はこれがちょうど読者にライターを接続して本当に簡単だろうと思ったが、私はそれを行う方法を見つけることができません。

私は、1つは自分自身を接続書くことができますが、それはすでに存在し、「必要がある」のようにそれは感じています。

アイデア?

乾杯 レイフ

+0

あなただけの追加とキューから削除するにはイベントを発生し、1つのスレッドでキューに入れ、他に取ることができ、スレッドセーフなキューを作成することができ、あなたがやりたいことには多くの方法があり、そう説明してくださいより詳細なあなたの要件 –

答えて

1

私は、「既製」解決策を見出すあきらめました。私は自分自身を書いた。それに書くエンドだ 新しいクラスThroughputStreamは、データを受信し、から読むために受信したデータ・チャンクを使用する読み取りエンドにスレッドセーフなキューを介してそれらをポストします。

namespace My.IO 
{ 
    public class ThrouputStream 
    { 
     private InputStreamClass inputStream; 
     private OutputStreamClass outputStream; 

     private Queue<byte[]> queue = new Queue<byte[]>(); 
     private System.Threading.EventWaitHandle queueEvent = new System.Threading.EventWaitHandle(false, System.Threading.EventResetMode.AutoReset); 

     public ThrouputStream() 
     { 
      inputStream = new InputStreamClass(this); 
      outputStream = new OutputStreamClass(this); 
     } 

     public Stream InputStream 
     { 
      get { return inputStream; } 
     } 

     public Stream OutputStream 
     { 
      get { return outputStream; } 
     } 

     private class InputStreamClass : Stream 
     { 
      private readonly Queue<byte[]> queue; 
      private readonly ThrouputStream parent; 
      private byte[] currentBlock = null; 
      private int currentBlockPos = 0; 
      private Boolean closed = false; 
      private int readTimeoutMs = System.Threading.Timeout.Infinite; 

      public InputStreamClass(ThrouputStream parent) 
      { 

       this.parent = parent; 
       this.queue = parent.queue; 
      } 

      public override bool CanRead 
      { 
       get { return true; } 
      } 

      public override bool CanSeek 
      { 
       get { return false; } 
      } 

      public override bool CanWrite 
      { 
       get { return false; } 
      } 

      public override void Flush() 
      { 
       // Do nothing, always flushes. 
      } 

      public override long Length 
      { 
       get { throw new NotSupportedException(); } 
      } 

      public override long Position 
      { 
       get 
       { 
        throw new NotSupportedException(); 
       } 
       set 
       { 
        throw new NotSupportedException(); 
       } 
      } 

      public override bool CanTimeout 
      { 
       get 
       { 
        return true; 
       } 
      } 

      public override int ReadTimeout 
      { 
       get 
       { 
        return readTimeoutMs; 
       } 
       set 
       { 
        readTimeoutMs = value; 
       } 
      } 

      public override int Read(byte[] buffer, int offset, int count) 
      { 
       if (currentBlock == null) 
       { 
        int queueCount; 
        lock (queue) 
        { 
         queueCount = queue.Count; 
         if (queueCount > 0) 
          currentBlock = queue.Dequeue(); 
        } 

        if (currentBlock == null && !parent.outputStream.IsClosed) 
        { 
         parent.queueEvent.WaitOne(readTimeoutMs); 

         lock (queue) 
         { 
          if (queue.Count == 0) 
           return 0; 

          currentBlock = queue.Dequeue(); 
         } 
        } 

        currentBlockPos = 0; 
       } 

       if (currentBlock == null) 
        return 0; 

       int read = Math.Min(count, currentBlock.Length - currentBlockPos); 
       Array.Copy(currentBlock, currentBlockPos, buffer, offset, read); 
       currentBlockPos += read; 
       if (currentBlockPos == currentBlock.Length) 
       { 
        // did read whole block 
        currentBlockPos = 0; 
        currentBlock = null; 
       } 

       return read; 
      } 

      public override long Seek(long offset, SeekOrigin origin) 
      { 
       throw new NotImplementedException(); 
      } 

      public override void SetLength(long value) 
      { 
       throw new NotImplementedException(); 
      } 

      public override void Write(byte[] buffer, int offset, int count) 
      { 
       throw new NotImplementedException(); 
      } 

      public override void Close() 
      { 
       this.closed = true; 
       base.Close(); 
      } 
     } 

     private class OutputStreamClass : Stream 
     { 
      private bool isClosed = false; 

      private readonly Queue<byte[]> queue; 
      private readonly ThrouputStream parent; 

      public OutputStreamClass(ThrouputStream parent) 
      { 
       this.parent = parent; 
       this.queue = parent.queue; 
      } 

      public override bool CanRead 
      { 
       get { return false; } 
      } 

      public override bool CanSeek 
      { 
       get { return false; } 
      } 

      public override bool CanWrite 
      { 
       get { return true; } 
      } 

      public override void Flush() 
      { 
       // always flush 
      } 

      public override long Length 
      { 
       get { throw new NotSupportedException(); } 
      } 

      public override long Position 
      { 
       get 
       { 
        throw new NotSupportedException(); 
       } 
       set 
       { 
        throw new NotSupportedException(); 
       } 
      } 

      public override int Read(byte[] buffer, int offset, int count) 
      { 
       throw new NotSupportedException(); 
      } 

      public override long Seek(long offset, SeekOrigin origin) 
      { 
       throw new NotSupportedException(); 
      } 

      public override void SetLength(long value) 
      { 
       throw new NotSupportedException(); 
      } 

      public override void Write(byte[] buffer, int offset, int count) 
      { 
       byte[] copy = new byte[count]; 
       Array.Copy(buffer, offset, copy, 0, count); 
       lock (queue) 
       { 
        queue.Enqueue(copy); 
        try 
        { 
         parent.queueEvent.Set(); 
        } 
        catch (Exception) 
        { } 
       } 
      } 

      public override void Close() 
      { 
       this.isClosed = true; 
       base.Close(); 

       // Signal event, to stop waiting consumer 
       try 
       { 
        parent.queueEvent.Set(); 
       } 
       catch (Exception) 
       { } 
      } 

      public bool IsClosed 
      { 
       get { return isClosed; } 
      } 
     } 

    } 
} 
2

ストリームとTextWriter/TextReaderをスレッド間の有効な通信手段として使用することをお勧めします。それぞれの "キュー"に対して1つのストリームが必要で、有効なデータが完全に書き込まれたり読み取られたりするためには、書き込みまたは読み取り操作ごとにそのストリームをロックする必要があります。よりよい解決策は、おそらくそうのようになります。

はカップルのManualResetEventsとともに、文字列型のキューを設定します。一般的な考え方は、スレッドシグナリングを使用して、2つのスレッドがロックを必要とせずに通信できるようにすることです。

public static class ThreadTest 
{ 
    public void Main() 
    { 
     long exit = 0; 

     Queue<string> messages = new Queue<string>(); 
     ManualResetEvent signal1 = new ManualResetEvent(); 
     ManualResetEvent signal2 = new ManualResetEvent(); 

     signal2.Set(); 

     Thread writer = new Thread(() => 
     { 
      while (exit == 0) 
      { 
       string value = Console.ReadLine(); 
       if (value == "exit") 
       { 
        Interlocked.Exchange(ref exit, 1); 
       } 
       else 
       { 
        messages.Enqueue(value); 
        Console.WriteLine("Written: " + value); 
        signal1.Set(); 
       } 

       signal2.WaitOne(); 
      } 
     }); 

     Thread reader = new Thread(() => 
     { 
      while (exit == 0) 
      { 
       signal1.WaitOne(); 
       signal2.Reset(); 

       value = messages.Dequeue(); 
       Console.WriteLine("Read: " + value); 

       signal2.Set(); 
       signal1.Reset(); 
      } 
     }); 

     reader.Start(); 
     writer.Start(); 
    } 
} 
+0

私は完全に同意する! ストリームを使用して2つのスレッド間で通信することは、一般的には悪い考えです。 私の実験では、コンソールプログラムのベースとなる既存のコードを "ユニットテスト"するためにそれを使用する意向がありました。 – leiflundgren

関連する問題