2012-04-09 12 views
16

http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9見かけBufferBlock.Post/Receive/ReceiveAsyncレース/バグ

にクロスポスト...私は本当に、その最大の潜在的にTplDataflowを使用していませんよ。 ATM私は単純に、メッセージの受け渡しに安全なキューとしてBufferBlockを使用しています。ここでは、プロデューサとコンシューマが異なる速度で実行されています。私は の方法について私に困惑したままにしておかしく思っています。 (2000行分散液の一部である)上記のコードで

private BufferBlock<object> messageQueue = new BufferBlock<object>(); 

public void Send(object message) 
{ 
    var accepted=messageQueue.Post(message); 
    logger.Info("Send message was called qlen = {0} accepted={1}", 
    messageQueue.Count,accepted); 
} 

public async Task<object> GetMessageAsync() 
{ 
    try 
    { 
     var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); 
     //despite messageQueue.Count>0 next line 
     //occasionally does not execute 
     logger.Info("message received"); 
     //....... 
    } 
    catch(TimeoutException) 
    { 
     //do something 
    } 
} 

Sendは100msごとかそこら定期的に呼び出されています。これは、項目がPostで、messageQueueに約10回/秒であることを意味します。これが確認されます。ただし、ReceiveAsyncがタイムアウト時間内に完了しない場合があります(PostReceiveAsyncを完了させていません)。TimeoutExceptionが30秒後に発生していることがあります。この時点で、messageQueue.Countは数百になっています。これは予期せぬことです。この問題は、ポスティング速度が遅い(1ポスト/秒)場合に観察され、通常1000個のアイテムがBufferBlockを通過する前に発生します。

(原因で発生上記のバグに)受信したときに、この問題を回避するために、私は作品次のコードを使用して、時折午前1秒の待ち時間が発生する

public async Task<object> GetMessageAsync() 
    { 
     try 
     { 
      object m; 
      var attempts = 0; 
      for (; ;) 
      { 
       try 
       { 
        m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); 
       } 
       catch (TimeoutException) 
       { 
        attempts++; 
        if (attempts >= 30) throw; 
        continue; 
       } 
       break; 

      } 

      logger.Info("message received"); 
      //....... 
     } 
     catch(TimeoutException) 
     { 
      //do something 
     } 
    } 

これは競合状態のように見えますTDFでは私には同じような方法でBufferBlockを使用している他の場所ではなぜこのようなことが起こらないのかを知ることができません。実験的にReceiveAsyncからReceiveに変更することは役に立ちません。私はチェックしていませんが、私は孤立していると想像しています。上のコードは完全に動作します。これは、「TPLデータフローの紹介」tpldataflow.docxに記載されているパターンです。

私はこれの最下部に到達するために何ができますか?何が起きているのかを推測するのに役立つ指標がありますか?信頼できるテストケースを作成できない場合は、さらに多くの情報を提供できますか?

ヘルプ!

+1

あなたのしていることや期待していることに間違いはありません。私は間違いなく、MSDNフォーラムでこれ以上アクティブにしておく必要があると思います。あなたはすでに@StephenToubの注目を集めています。彼は間違いなく探している人です。 –

+0

決してそれの底にはいない。私は小さな自己完結型の例で問題を再現することができませんでした。 BufferBlockだけを使用していたので、代わりに私自身の非同期キュー実装をロールバックしました。私は他のコードを変更する必要はありませんでした...私は使用していたBufferBlockインターフェイスの部分を再実装しました。私は今、何か気が散っていると思っていますが、私はそれを証明することはできません。 Grr。 – spender

+0

@spendor非常に面白いですが、奇妙なことに、私はBufferBlockを見つけた後、自分自身の非同期並行キュー実装を破棄しました...今私は再考する必要があります。ありがとう。 –

答えて

1

スティーブンは、以下の解決策であると考えているようだ

するvarメートル= messageQueue.ReceiveAsync()を待ちます。

代わりに:

するvar M =待つmessageQueue.ReceiveAsync(TimeSpan.FromSeconds(30))。

これを確認または拒否できますか?

+0

それはうまくいかなかった。私が選んだReceiveAsyncのオーバーロードは問題ではなく、結果は同じでした。上記の私のコメントを参照してください。 – spender