2017-06-09 23 views
1

私はvoid Actionを任意のスレッドのキューに入れることができるクラスまたはパターンを探しています。キュー内のアクションを1つずつ呼び出す別のスレッドがあります。 1つのアクションが一度に実行され、キューの順序が保持されます。別のスレッドで順番に処理する処理

すでにアクション内にlockという名前でそれを実行しようとしましたが、それは注文を保証しないようです。

+4

はTPLデータフローやそれだけのベースBlockingCollectionを探してください –

答えて

1

ここでは、多かれ少なかれあなたがしたいと思う簡単な例のクラスです。下のコードはテストしていませんが、COMアパートメントモデル間でFunc<...>Action<...>を呼び出すのに使用するはるかに複雑なバージョンに基づいていますが、これはうまく機能します。それにかかわらず、私はこれがあなたを十分に近づけて、あなたの問題を解決する一つの方法を見るのを助けることを願っています。

以下のクラスは、Actionのアイテムを受け入れてキューに入れ、実行する作業があれば1つずつ実行します。 lockブロックとSystem.Collections.Generic.Queue<T>ブロックを代わりに使用すると、System.Collections.Concurrent.ConcurrentQueue<T>が利用できない以前のバージョンのC#でこの作業を行うことができます。私は、C#の他のバージョンとの互換性を容易にするために、新しいイベントのいくつかを使用しないことにしました。

あなたもAction<TArg>と基本クラスを書いて、派生クラス自身が保持してサポートしたいAction<...>の各バリエーションのためのクラスを派生する必要Action<TArg1, TArg2, ...>のようなそのバリエーションのすべてを使用する場合は、クラスを適応させる方法がありますAction<...>を実行し、スレッドに対しても実行します。

クラスはスレッドセーフなので、実行中のスレッドからActionを追加できます。私はこれも決して列にイベントを残して寝るという点では安全だと信じていますが、私はそれを証明することはできません。キューの長さを確認するのではなく、Interlocked.IncrementInterlocked.Decrement、およびVolatile.Readという別のアイテムカウンタを使用できますが、キューアイテムを追加してカウンタを増やすときは、操作の順序に注意する必要があります。

sealed class ActionRunner : IDisposable 
{ 
    private ConcurrentQueue<Action> m_queue; 
    private Thread m_workThread; 
    private AutoResetEvent m_threadHasWorkEvent; 
    private ManualResetEvent m_killThreadEvent; 

    public ActionRunner() 
    { 
     m_queue = new ConcurrentQueue<Action>(); 
     m_threadHasWorkEvent = new AutoResetEvent(false); 
     m_killThreadEvent = new ManualResetEvent(false); 
     m_workThread = new Thread(new ParameterizedThreadStart(ActionRunnerThread)); 
     m_workThread.IsBackground = true; 
     m_workThread.Start(); 
    } 

    private bool disposedValue = false; 

    private void Dispose(bool disposing) 
    { 
     if(!disposedValue) 
     { 
      if(m_threadKillEvent != null) 
      { 
       m_threadKillEvent.Set(); 
      } 

      if(m_queue != null) 
      { 
       Action dummy; 
       while(m_queue.TryDequeue(out dummy)) 
        ; 
      } 

      if(m_workThread != null) 
      { 
       m_workThread.Join(); 
      } 

      disposedValue = true; 
     } 
    } 

    ~ActionRunner() 
    { 
     Dispose(false); 
    } 

    public void Dispose() 
    { 
     Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    public void AddAction(Action toExecute) 
    { 
     m_queue.Push(toExecute); 
     m_threadHasWorkEvent.Set(); 
    } 

    private void ActionRunnerThread(object o) 
    { 
     WaitHandle[] waitList = new WaitHandle[] { m_killThreadEvent, m_threadHasWorkEvent }; 

     while(true) 
     { 
      int which = WaitHandle.WaitAny(waitList); 
      if(which == 0) 
      { 
       break; 
      } 

      while(m_queue.Count > 0) 
      { 
       Action toExecute; 
       if(!m_queue.TryDequeue(out toExecute)) 
       { 
        continue; 
       } 

       toExecute(); 
      } 
     } 
    } 
} 

使用法:

ActionRunner myRunner = new ActionRunner(); 

// void DoWorkHere(); is defined elsewhere 
myRunner.AddAction(DoWorkHere); 

// This also works 
myRunner.AddAction(() => DoWorkHere()); 

// This works too, but you have to be very careful of closures 
// and properly capturing the values of x and y. For example, 
// if you change x and y to something else after adding the action, 
// there's a good chance the printed result will use the updated 
// values and not the original ones. 
int x = 5, y = 2; 
myRunner.AddAction(() => 
{ 
    // Assuming string AddTwoNumbers(int x, int y) { ... } 
    System.Diagnostics.Debug.WriteLine(AddTwoNumbers(x, y)); 
}); 

// Here's a safer way to do the above without worrying about the 
// values being changed. There are many ways to do this with cleaner 
// syntax, but this is an easy one. 
int x = 5, y = 2; 
{ 
    // This copy happens within its own scope and will properly 
    // capture the values when used in the closure. 
    int localX = x, localY = y; 
    myRunner.AddAction(() => 
    { 
     System.Diagnostics.Debug.WriteLine(AddTwoNumbers(localX, localY)); 
    }); 
} 

// When you're done, Dispose the class so it stops running the thread 
myRunner.Dispose(); 
0

あなたはその後、同時実行を制限TaskSchedulerの独自のバージョンを作成することができTPLを使用している場合は、あなただけで各タスクをキューに入れることができるのだろう同じスケジューラ。

static void Main() 
    { 
     LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1); 
     TaskFactory factory = new TaskFactory(lcts); 

     factory.StartNew(()=> 
      { 
       for (int i = 0; i < 500; i++) 
       { 
        Console.Write("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId); 
       } 
      } 
     ); 

     Console.ReadKey(); 
    } 

う - https://msdn.microsoft.com/en-us/library/ee789351(v=vs.100).aspx

関連する問題