2016-07-15 17 views
1

複数のWCFサービスからデータを受信し、その結果をいくつか処理した後、SignalRを使用して単一の接続でIISサーバーに転送します。BlockingCollectionをキューとして使用しているコンシューマの非同期メソッドを待ちます。

WCFサービスがプロデューサであり、SignalRを使用してデータを送信するクラスがコンシューマであるプロデューサコンシューマパターンを使用して実装しようとしました。キューの場合、私はBlockingCollectionを使用しました。

ただし、await/asyncを使用してコンシューマのデータを送信する場合、whileループは他のすべてのスレッドがデータをキューに追加するまで停止します。

私は実際にデータを送信するコードをTask.Delay(1000).Wait();またはawait Task.Delay(1000);に置き換えてテストしましたが、どちらも問題にはなりません。 単純なThread.Sleep(1000);はうまく動作しているようですが、非同期コードが問題だと思うようになります。

私の質問です:非同期コードがwhileループで完了しないようにする何かがありますか?私は何が欠けていますか?

new Thread(Worker).Start(); 

そしてコンシューマコード:

私はこのような消費者のスレッドを開始しています

浪費家が正しく指摘したように、(名前が示すように) BlockingCollectionだけのために意図されて
private void Worker() 
{ 
    while (!_queue.IsCompleted) 
    { 
     IMobileMessage msg = null; 
     try 
     { 
      msg = _queue.Take(); 
     } 
     catch (InvalidOperationException) 
     { 
     } 

     if (msg != null) 
     { 
      try 
      { 
       Trace.TraceInformation("Sending: {0}", msg.Name); 
       Thread.Sleep(1000); // <-- works 
       //Task.Delay(1000).Wait(); // <-- doesn't work 
       msg.SentTime = DateTime.UtcNow; 
       Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime); 
      } 
      catch (Exception e) 
      { 
       TraceException(e); 
      } 
     } 
    } 
} 
+2

ブロッキングと非同期は友人ではありません。 'async'と' BlockingCollection 'を混在させる場合は、' BlockingCollection 'を削除し、TPL Dataflowを見てください。 'BufferBlock 'は良い出発点であり、 'BlockingCollection 'とほぼ同じですが、データフローにはプロデューサ/コンシューマのシナリオでさらに多くの機能が用意されています。それについて学ぶために時間をかけてください。価値があります。 – spender

+0

ニース、ありがとう、私はそれを調べます。 –

答えて

2

ブロックコードで使用し、非同期コードではうまく動作しません。

BufferBlock<T>などの非同期互換のプロデューサ/コンシューマキューがあります。この場合、私はActionBlock<T>も良いだろうと思うだろう:

private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg => 
{ 
    try 
    { 
    Trace.TraceInformation("Sending: {0}", msg.Name); 
    await Task.Delay(1000); 
    msg.SentTime = DateTime.UtcNow; 
    Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime); 
    } 
    catch (Exception e) 
    { 
    TraceException(e); 
    } 
}); 

これはあなたの全体の消費スレッドとメインループを置き換えます。

+0

私の消費者とキューを置き換えることを提案したコードを追加し、またTPLデータフローに関するあなたのブログの優れた記事を読んで、それが動作するように見えますが、今度はプロデューサタイマーが毎回2ティック後に停止するようにはまだ確認できません。しかし、私はそれが新しいことだと思う。私はそれが動作していることを確認できるようにすぐにあなたの答えを受け入れるよ!ありがとう! –

+0

なんらかの理由で私はそれを動作させることができないようです...私のプロデューサは毎秒WCFサービスからの新しいメッセージがあるかどうかチェックします( 'Task.Delay(1000); 'をループとして使用して、アクションブロックの遅延とプロデューサの遅延で使用する時間によっては、すべて正常に動作し、完全に停止したり、ブロックバッファにメッセージをポストしても、それ以上処理しません。 –

+0

@ J.Neijt:いいえ、非同期プロデューサがうまくいきます。TraceExceptionが例外をスローすると、ブロックが機能しなくなります。 –

関連する問題