2017-07-03 13 views
0

コレクションに例外例外が修正されていることを認識していますが、このインスタンスでは表示されません。私はそれを修正する方法を知っている、私はちょうどここで発生する理由を理解したい。コレクションのロック保護に関係なくコレクションが変更されました

私はTaskCompletionSourcesのセットを持っており、そのセットへのアクセスを保護するlockObjectを持っています。 1つのタスク(T1)では、TCSを作成し、タスクが完了するまで最大3秒間待機します。

他のタスク(T2)では、私は0.5秒待ってから、T1が待っているタスクを完了したいと思います。

このコードスニペットではTCSのセットはまったく使用されていませんが、私が実際に作業しているプログラムでは、これは特定の数の異なるウェイターのリストを保持することです。ウェイターのリストもクリアされるはずです。このスニペットでは、ウェイター(T1)は1つしかありませんが、問題を再現するためにTCSのセットを使用する必要があります。

プログラムは、次の出力を生成します。

T1 start. 
Wait start. 
Add start. 
Add end. 
T2 start. 
CompleteAndClear start. 
Completing 1 TCSs. 
Remove start. 
Remove end. 
Wait end. 
Wait succeeded. 
T1 end. 

Unhandled Exception: System.AggregateException: One or more errors occurred. (Collection was modified; enumeration operation may not execute.) ---> System.InvalidOperationException: Collection was modified; enumeration operation may not execute. 
    at System.Collections.Generic.HashSet`1.Enumerator.MoveNext() 
    at ConsoleApp1.Program.CompleteAndClear() in Program.cs:line 104 
    at ConsoleApp1.Program.<T2Async>d__5.MoveNext() in Program.cs:line 45 
... 

私は理解していないもの:

  • を例外がスローされるのはなぜ?
  • CompleteAndClearがまだロックを保持している間、Re​​moveメソッドを開始して終了する方法はありますか?

TrySetResultは、Lockを保持する同じスレッドを使用してWaitを終了させるように思われます。現在のスレッドはWaitAsync関数にジャンプし、Removeに進み、ロックはその事実によってバイパスされます。このスレッドはCompleteAndClear(ロックは同じスレッドによって再入可能)からのロックを保持し、HashSetが削除されてから例外が呼び出されます。しかし、CompleteAndClearを実行しているスレッドは、その結果を設定してタスクを完了としてマークし、そのセットをクリアしてロックを解除してから、ロックだけを入力し、「TCSが見つかりません」と報告する必要があります。

コード内の些細な修正が完璧に動作しますが、意図と一緒に行かない

 if (!res) Remove(tcs); 

 Remove(tcs); 

を交換することです。もう1つは、それを消去する前にセットのコピーを作成し、そのコピーで結果を設定することで、そのケースを完全に解決します。

コード:

using System; 
using System.Collections.Generic; 
using System.Threading.Tasks; 
using System.Threading; 

namespace ConsoleApp1 
{ 
    class Program 
    { 
    static object lockObject = new object(); 

    static HashSet<TaskCompletionSource<bool>> completionSources = new HashSet<TaskCompletionSource<bool>>(); 

    static void Main(string[] args) 
    { 
     MainAsync().Wait(); 
    } 

    static async Task MainAsync() 
    { 
     Task t1 = T1Async(); 
     Task t2 = T2Async(); 
     await t1; 
     await t2; 
    } 

    static async Task T1Async() 
    { 
     Console.WriteLine("T1 start."); 

     if (await WaitAsync()) Console.WriteLine("Wait succeeded."); 
     else Console.WriteLine("Wait failed."); 

     Console.WriteLine("T1 end."); 
    } 

    static async Task T2Async() 
    { 
     Console.WriteLine("T2 start."); 

     await Task.Delay(500); 
     CompleteAndClear(); 

     Console.WriteLine("T2 end."); 
    } 


    static async Task<bool> WaitAsync() 
    { 
     Console.WriteLine("Wait start."); 
     bool res = false; 
     TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(); 
     using (CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(3000)) 
     { 
     using (CancellationTokenRegistration cancellationTokenRegistration = cancellationTokenSource.Token.Register(() => { tcs.TrySetResult(false); })) 
     { 
      Add(tcs); 

      res = await tcs.Task; 

      Remove(tcs); 
     } 
     } 
     Console.WriteLine("Wait end."); 
     return res; 
    } 

    static void Add(TaskCompletionSource<bool> TaskCompletionSource) 
    { 
     Console.WriteLine("Add start."); 

     lock (lockObject) 
     { 
     completionSources.Add(TaskCompletionSource); 
     } 

     Console.WriteLine("Add end."); 
    } 

    static void Remove(TaskCompletionSource<bool> TaskCompletionSource) 
    { 
     Console.WriteLine("Remove start."); 

     lock (lockObject) 
     { 
     if (!completionSources.Remove(TaskCompletionSource)) Console.WriteLine("TCS not found."); 
     } 

     Console.WriteLine("Remove end."); 
    } 


    static void CompleteAndClear() 
    { 
     Console.WriteLine("CompleteAndClear start."); 
     lock (lockObject) 
     { 
     if (completionSources.Count > 0) 
     { 
      Console.WriteLine("Completing {0} TCSs.", completionSources.Count); 
      foreach (TaskCompletionSource<bool> tcs in completionSources) 
      tcs.TrySetResult(true); 

      Console.WriteLine("Clearing TCS list."); 
      completionSources.Clear(); 
     } 
     } 
     Console.WriteLine("CompleteAndClear end."); 
    } 

    } 
} 

答えて

1

問題の核心は、TaskCompletionSource<T>.TrySetResultを同期が実際await does use that flagことと合わせTaskContinuationOption.ExecuteSynchronouslyに登録されたすべてのタスクの継続を呼び出すことです。

したがって、CompleteAndClearはロックを受け取り、を保持しながらTrySetResultを呼び出します。Since this is in a free-threaded context,TrySetResultWaitAsyncメソッドを再開し、メソッドを呼び出します。これは、ロックを受け取り(再帰的ロックを許可するので、lockが成功するため)、コレクションを変更します。 TrySetResultが返されると(Removeの実行後)、列挙子は問題を検出して例外をスローします。

ここでは、いくつかの(IMOに疑問のある)設計上の決定があります。私はawait using ExecuteSynchronouslyと同様にrecursive locks in generalを対象にしています(「不整合な不変量」の項を特にこのシナリオに適用します)。

ただし、key principles of multithreading: never invoke arbitrary code while holding a lockのいずれかに厳密に従うことで、これらの設計上の決定に固有の問題を回避できます。もちろん、明らかでないことは、TaskCompletionSource<T>.TrySetResultが任意のコードを呼び出すことです。

今、解決策を紹介します。

新しいランタイム(netstandard1.3/.NET Core 1.0以上と考えている)を十分にターゲットにしている場合は、をTaskCompletionSource<T>コンストラクタに渡すことができます。これにより、最も望ましい動作が得られます。タスクは即座に同期して完了しますが、すべての継続は強制的に非同期になります。

旧式のプラットフォームでは、デリゲート内で「完了」作業(つまりTrySetResultの呼び出し)をカプセル化して(IDisposableでさらにラッピングすることを推奨します)、メソッドがロックを解除するまでその作業を延期できます。

最後に、非同期互換コーディネーションプリミティブを作成してから、これらを使用して作業キューなどのより複雑な構造を構築することをお勧めします。コードの一部でこれらの微妙な状況に対処するのはずっと簡単ですし、場合によってはそれを外注することもできます。たとえば、私のAsyncEx libraryには、非同期互換コーディネーションプリミティブの完全なスイートがあります。 v5 uses the new RunContinuationsAsynchronouslyフラグ、v4 uses the delay-completion-with-IDisposable workaround

+0

素敵な説明をありがとう。あなたは私を "' TrySetResult'で任意のコードを呼び出すことができます。 " - 私はこれを知らなかった。そのドキュメンタリーは私の好みのためには短すぎます。あなたがここで何を言っているのか分かっていれば、 'await'のデフォルトの動作は' ExecuteSynchronously'で、TCSの作成時に 'TaskCreationOptions'を使うとオーバーライドすることができます。そうですか?しかし、あなたがリンクしている記事では、これらのオプションが尊重されないことがあると言われますが(しかし、それは最悪のケースと思われます)、このため、 'Continuations Asynchronously'を決して使用しないで同期モードで実行しますか? – Wapac

+0

2つの質問があります:再帰的なロックが悪いと書いたので、一般的には 'lock()'構造体を使わないことをお勧めしますか?もしそうなら、その代わりに何をお勧めしますか?もう一つは、もし私が何かを見逃していなければ、ここでは非再帰的なロック機構を使っても私は全く助けにならないでしょうか? – Wapac

+0

2番目のコメントの私の最初の質問は、実際には、私の 'lock()'を非再帰的に(私のコードの設計によって)使っても、それらを避けて別の構造を使う方が良いでしょうか? – Wapac

関連する問題