2010-12-02 10 views
11

タスク並列ライブラリを使用する前に、私はしばしばCorrelationManager.ActivityIdを使用して、複数のスレッドによるトレース/エラー報告を追跡しました。タスク並列ライブラリのタスクはActivityIDにどのように影響しますか?

ActivityIdはスレッドローカルストレージに格納されるため、各スレッドは独自のコピーを取得します。アイデアは、スレッド(アクティビティ)を起動するときに、新しいActivityIdを割り当てることです。 ActivityIdは他のトレース情報とともにログに書き込まれ、単一の「アクティビティ」のトレース情報を単一にすることができます。これは、ActivityIdをサービスコンポーネントに持ち越すことができるため、WCFでは本当に便利です。

は、ここで私が話しているものの例である:今

static void Main(string[] args) 
{ 
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) => 
    { 
     DoWork(); 
    })); 
} 

static void DoWork() 
{ 
    try 
    { 
     Trace.CorrelationManager.ActivityId = Guid.NewGuid(); 
     //The functions below contain tracing which logs the ActivityID. 
     CallFunction1(); 
     CallFunction2(); 
     CallFunction3(); 
    } 
    catch (Exception ex) 
    { 
     Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString()); 
    } 
} 

、TPLで、私の理解では、複数のタスクを共有するスレッドです。これは、ActivityIdが中間タスクを(別のタスクによって)再初期化する傾向があることを意味しますか?アクティビティのトレースに対処する新しいメカニズムはありますか?

+0

私は何も提供する必要はありませんが、私はこの問題にも興味があります。 Trace.CorrelationManagerがActivityIdとLogicalOperationStackを格納するために使用する技術であるため、CallContext.LogicalSetDataを使用して設定された情報にも同じ質問が当てはまると思われます。 – wageoghe

+0

@wageohe - 私は今日、この今日のテストに周りに行って、私の結果を投稿しました:) –

+0

私は私の答えにいくつかの詳細を掲載しました。私はまた、ここで別の答えへのリンクを投稿しました。ここで私がここで尋ねた新しい質問と、マイクロソフトのParallel Extensionsフォーラムで質問しました(しかし、まだ1/21/2011の回答は得られていません) 。たぶん、情報が役に立つかもしれません。興味深い結果。 – wageoghe

答えて

6

いくつかの実験を実行したところ、私の質問では、TPLで作成された複数のタスクが同時に同じスレッド上で実行されないという誤った仮定があることが判明しました。

ThreadLocalStorageは、一度に1つのタスクでしかスレッドを使用できないため、.NET 4.0のTPLでは安全に使用できます。

のタスクが同時に私はDotNetRocks(申し訳ありませんが、私はそれがあった示した覚えていない)におよそ#5.0 Cを聞いたインタビューに基づいていたスレッドを共有することができます仮定 - だから、私の質問は、(かない場合があります)すぐに関連するようになる。

私の実験ではいくつかのタスクが開始され、実行されたタスクの数、所要時間、消費されたスレッドの数が記録されます。もし誰かがそれを繰り返したいのならば、コードは以下の通りです。

class Program 
{ 
    static void Main(string[] args) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 
     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
      task = Task.Factory.StartNew(() => 
      { 
       DoLongRunningWork(); 
      }, taskCreationOpt); 

      allTasks[i] = task; 
     } 

     Task.WaitAll(allTasks); 
     stopwatch.Stop(); 

     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.ReadKey(); 
    } 


    private static List<int> threadIds = new List<int>(); 
    private static object locker = new object(); 
    private static void DoLongRunningWork() 
    { 
     lock (locker) 
     { 
      //Keep a record of the managed thread used. 
      if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
       threadIds.Add(Thread.CurrentThread.ManagedThreadId); 
     } 
     Guid g1 = Guid.NewGuid(); 
     Trace.CorrelationManager.ActivityId = g1; 
     Thread.Sleep(3000); 
     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 
    } 
} 

出力(もちろん、これはマシンに依存します)したTaskCreationOptions.LongRunningにtaskCreationOptを変更

Completed 100 tasks in 23097 milliseconds 
Used 23 threads 

は異なる結果を与えた:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads 
+0

あなたのコードに基づいたテストプログラムを使って、Trace.CorrelationManager.ActivityIdについて興味深いものを見つけました。あなたのコードを多かれ少なかれ使用して、Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperationを使って、私は "良い"結果を得ることができます。つまり、StartLogicalOperation/StopLogicalOperationを使用して(デリゲート内の)各タスクをデモンストレーションしてブラケット処理する方法でTasksを使用すると、LogicalOperationStackは常に同期しているように見えます。ただし、Parallel.Forを使用すると、結果が悪化する可能性があります。それはコードがあるので私は答えとして私のテストを投稿します – wageoghe

+0

偉大な答え;投稿していただきありがとうございます。ただし、 'List threadIds'フィールドは、同期されていないジェネリックリスト(例えば、taskIdをthreadIdに関連付けるか、それを並行ハッシュテーブルとして使用する場合など)ではなく、ConcurrentBagまたはConcurrentDictionaryでなければなりません.valueを無視してください)。 –

3

私の投稿これを許してください。あなたの質問に本当に答えるものではないので答えとして、CorrelationManagerの振る舞いとスレッド/タスク/ etcを扱うので、あなたの質問に関連しています。私はCorrelationManagerのLogicalOperationStack(およびStartLogicalOperation/StopLogicalOperationメソッド)を使用して、マルチスレッドシナリオで追加のコンテキストを提供する方法を検討してきました。

Parallel.Forを使用して作業を並行して実行する機能を追加するために、私はあなたの例をとり、わずかに変更しました。また、StartLogicalOperation/StopLogicalOperationを使用して(内部で)DoLongRunningWorkを括弧で括っています。概念的には、DoLongRunningWorkは、それが実行されるたびに、このような何かを行います。

DoLongRunningWork 
    StartLogicalOperation 
    Thread.Sleep(3000) 
    StopLogicalOperation 

を私は(あるとして多かれ少なかれ)あなたのコードにこれらの論理演算を追加した場合、論理operatinsのすべてが(同期に残ることを私が発見しました常にスタック上で期待されるオペレーションの数とスタック上のオペレーションの値は常に期待通りです)。

私自身のテストでは、これは必ずしもそうではないことがわかりました。論理演算スタックが壊れていました。私が思いつくことができる最も良い説明は、 "子"スレッドが終了したときにCallContext情報を "親"スレッドコンテキストに戻す "合併"が、 "古い"子スレッドコンテキスト情報(論理演算)を "別の兄弟の子スレッドによって継承されます。

この問題は、Parallel.Forが明らかに "ワーカースレッド"の1つとしてメインスレッド(少なくともサンプルコードでは書かれているように)を使用しているという事実に関連している可能性がありますパラレルドメイン)。 DoLongRunningWorkが実行されるたびに、新しい論理操作が(開始時に)開始され、終了時に停止されます(つまり、LogicalOperationStackにプッシュされ、それからポップされます)。メインスレッドにすでに有効な論理演算があり、DoLongRunningWorkがMAIN THREADで実行すると、新しい論理演算が開始され、メインスレッドのLogicalOperationStackはTWO演算を持つようになります。 DoLongRunningWorkの後続の実行(DoLongRunningWorkのこの「反復」がメインスレッド上で実行されている限り)は、(明らかに)メインスレッドのLogicalOperationStackを継承します(これは、期待される操作ではなく2つの操作を持ちます)。

LogicalOperationStackの動作が、私の変更したバージョンの例とは違っていたのは、長い時間がかかりました。最後に私のコードでは、プログラム全体を論理演算で囲んだのに対し、テストプログラムの修正版ではそうではありませんでした。私のテストプログラムでは、「仕事」が実行されるたびに(DoLongRunningWorkに類似している)、すでに論理的な操作が有効であったということです。あなたのテストプログラムの私の修正版では、私は論理的な操作でプログラム全体を括弧で囲まなかった。

私は、Parallel.Forを使用している場合、プログラム全体を論理演算で囲むようにテストプログラムを変更しましたが、まったく同じ問題が発生しました。上記の概念モデルを使用して

が、これは正常に実行されます:

Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 

これは最終的に明らかに同期LogicalOperationStackの外に起因して主張しますが:ここ

StartLogicalOperation 
Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 
StopLogicalOperation 

は私のサンプルプログラムです。 ActivityIdとLogicalOperationStackを操作するDoLongRunningWorkメソッドを持っている点で、あなたと似ています。私はDoLongRunningWorkの2つの味があります。 1つのフレーバは、Parallel.Forを使用するタスクを使用します。並列処理全体が論理演算で囲まれるように各フレーバを実行することもできます。したがって、並列操作を実行するための合計4つの方法があります。それぞれを試すには、目的の "使用..."メソッドのコメントを外し、再コンパイルして実行します。 UseTasks,UseTasks(true)、およびUseParallelForはすべて完了まで実行する必要があります。 UseParallelFor(true)は、LogicalOperationStackに予想されるエントリ数がないため、ある時点でアサートします。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Diagnostics; 
using System.Threading; 
using System.Threading.Tasks; 

namespace CorrelationManagerParallelTest 
{ 
    class Program 
    {  
    static void Main(string[] args)  
    { 
     //UseParallelFor(true) will assert because LogicalOperationStack will not have expected 
     //number of entries, all others will run to completion. 

     UseTasks(); //Equivalent to original test program with only the parallelized 
         //operation bracketed in logical operation. 
     ////UseTasks(true); //Bracket entire UseTasks method in logical operation 
     ////UseParallelFor(); //Equivalent to original test program, but use Parallel.For 
          //rather than Tasks. Bracket only the parallelized 
          //operation in logical operation. 
     ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation 
    }  

    private static List<int> threadIds = new List<int>();  
    private static object locker = new object();  

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId; 

    private static int mainThreadUsedInDelegate = 0; 

    // baseCount is the expected number of entries in the LogicalOperationStack 
    // at the time that DoLongRunningWork starts. If the entire operation is bracketed 
    // externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise, 
    // it will be 0. 
    private static void DoLongRunningWork(int baseCount)  
    { 
     lock (locker) 
     { 
     //Keep a record of the managed thread used.    
     if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
      threadIds.Add(Thread.CurrentThread.ManagedThreadId); 

     if (Thread.CurrentThread.ManagedThreadId == mainThreadId) 
     { 
      mainThreadUsedInDelegate++; 
     } 
     }   

     Guid lo1 = Guid.NewGuid(); 
     Trace.CorrelationManager.StartLogicalOperation(lo1); 

     Guid g1 = Guid.NewGuid();   
     Trace.CorrelationManager.ActivityId = g1; 

     Thread.Sleep(3000);   

     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 

     //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation 
     //in effect when the Parallel.For operation was started. 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1)); 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1)); 

     Trace.CorrelationManager.StopLogicalOperation(); 
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
     task = Task.Factory.StartNew(() => 
     { 
      DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }, taskCreationOpt); 
     allTasks[i] = task; 
     } 
     Task.WaitAll(allTasks); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    private static void UseParallelFor(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Parallel.For(0, totalThreads, i => 
     { 
     DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    } 
} 

この全体のLogicalOperationStackはParallel.Forで使用することができる場合の問題(および/または他のスレッド/タスクの構築)、またはどのようにそれがメリットに、おそらく、独自の質問を使用することができます。たぶん私は質問を投稿します。その間、私はあなたにこれについての考えがあるかどうか疑問に思います(または、ActivityIdが安全であると思われるのでLogicalOperationStackの使用を検討していたのだろうか)。

[EDIT]

様々なスレッド/ ThreadPoolの/タスク/パラレルcontstructsの一部でLogicalOperationStackおよび/またはCallContext.LogicalSetDataの使用方法の詳細については、this questionに私の答えを参照してください。

はSO LogicalOperationStackとパラレルの拡張機能について上でここにも私の質問を参照してください。私はそれがトレースのように見えるのテストで http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc

最後に、ここにもMicrosoftのパラレル拡張機能のフォーラムで私の質問を参照してください。 CorrelationManager.LogicalOperationStackは、Parallel.ForまたはParallel.Invokeを使用しているときに破損する可能性があります。メインスレッドで論理演算を開始してから、デリゲートで論理演算を開始/停止します。 DoLongRunningWorkが実行中の場合(DoLongRunningWorkをさまざまな手法で起動する前にメインスレッドで論理演算を開始する場合)、LogicalOperationStackは常に2つのエントリを持つ必要があります(上記の2つのリンクのいずれかを参照)。したがって、「壊れた」とは、LogicalOperationStackに最終的に2つ以上のエントリがあることを意味します。

これは、Parallel.ForとParallel.InvokeがメインスレッドをDoLongRunningWorkアクションを実行するための「ワーカー」スレッドの1つとして使用するためです。

CallContext.LogicalSetDataに格納されたスタックを使用してLogicalOperationStack(CallContext.SetDataを介して格納されているLog4netのLogicalThreadContext.Stacksに似ています)の動作を模倣すると、さらに悪い結果が得られます。このようなスタックを使用してコンテキストを管理している場合、メインスレッドに「論理演算」があり、各繰り返しで論理演算が行われるほとんどすべてのシナリオで、壊れてしまいます(つまり、予想されるエントリ数はありません)/DoLongRunningWorkデリゲートの実行

+2

Downvote?コメント無し?ありがとう。 – wageoghe

関連する問題