9

私は少し非同期的なものを待っています。私はいくつかのプログラム間でメッセージの送受信にRabbitMQを使用しています。特定のスレッドでタスクが実行されないようにする

背景として、RabbitMQクライアントは、接続スレッドとハートビートスレッドの2つのスレッドを使用しています。 TCPを介してメッセージを受信するたびに、接続スレッドがメッセージを処理し、インタフェース経由でコールバックを呼び出します。ドキュメントでは、この呼び出し中に接続と同じスレッド上で行われているため、作業を続ける必要があるため、多くの作業を避けることが最善の方法です。彼らはQueueingBasicConsumerを提供しています。これは、メッセージを受信するのを待つために使用されるブロッキングの「デキュー」メソッドを備えています。

私は消費者がこの待機時間中に実際にスレッドコンテキストを解放して他の人が仕事をすることができるようにしたいので、非同期/待機のタスクを使用することに決めました。

私はawaitableデキューの方法があります:私は、次の方法でTaskCompletionSource Sを使用していますAwaitableBasicConsumerクラスを書いたこれが呼び出されます。

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) 
{ 
    //we are enqueueing a TCS. This is a "read" 
    rwLock.EnterReadLock(); 

    try 
    { 
     TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); 

     //if we are cancelled before we finish, this will cause the tcs to become cancelled 
     cancellationToken.Register(() => 
     { 
      tcs.TrySetCanceled(); 
     }); 

     //if there is something in the undelivered queue, the task will be immediately completed 
     //otherwise, we queue the task into deliveryTCS 
     if (!TryDeliverUndelivered(tcs)) 
      deliveryTCS.Enqueue(tcs); 
     } 

     return tcs.Task; 
    } 
    finally 
    { 
     rwLock.ExitReadLock(); 
    } 
} 

RabbitMQのクライアントが呼び出すコールバックタスクを満たしAMQP接続ねじ

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) 
{ 
    //we want nothing added while we remove. We also block until everybody is done. 
    rwLock.EnterWriteLock(); 
    try 
    { 
     RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); 

     bool sent = false; 
     TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs; 
     while (deliveryTCS.TryDequeue(out tcs)) 
     { 
      //once we manage to actually set somebody's result, we are done with handling this 
      if (tcs.TrySetResult(e)) 
      { 
       sent = true; 
       break; 
      } 
     } 

     //if nothing was sent, we queue up what we got so that somebody can get it later. 
     /** 
     * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the 
     * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are 
     * doing our thing here. 
     */ 
     if (!sent) 
     { 
      undelivered.Enqueue(e); 
     } 
    } 
    finally 
    { 
     rwLock.ExitWriteLock(); 
    } 
} 

rwLockのコンテキストからであります。 2つのキュー(deliveryTCSおよびundelivered)は、ConcurrentQueuesです。

問題:たまに

ごとに、デキュー・メソッドが例外をスロー待っ方法。このメソッドはasyncであるため、通常は問題にはならないため、タスクが入力する「例外」完了状態になります。問題は、DequeueAsyncを呼び出すタスクが、RabbitMQクライアントが作成するAMQP接続スレッドが待機した後に再開される状況にあります。通常、私はタスクがメインスレッドまたは周囲に浮かぶワーカースレッドの1つに再開するのを見ました。しかし、AMQPスレッドを再開して例外がスローされると、すべてが停止します。 タスクはに「例外状態」を入力せず、例外が発生したメソッドを実行中であることを示すAMQP接続スレッドが残ります。これは動作しませんなぜここ

私の主な混乱は次のとおりです。ここで

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards 

ConsumerTaskState state = new ConsumerTaskState() 
{ 
    Connection = connection, 
    CancellationToken = cancellationToken 
}; 

//if there is a problem, we execute our faulted method 
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called 
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted); 

は、テスト用に設定RunAsync方法で、次のとおりです。

public async Task RunAsync() 
{ 
    using (var channel = this.Connection.CreateModel()) 
    { 
     ... 
     AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel); 
     var result = consumer.DequeueAsync(this.CancellationToken); 

     //wait until we find something to eat 
     await result; 

     throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls 
     ... 
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state 
} 

私が書いたものを読んで、私は見ます私は私の問題をとてもうまく説明していないかもしれません。明確化が必要な場合は、尋ねてください。次のように私が出ている

私の解決策は以下のとおりです。

  • すべての非同期/待つコードを削除し、単にスレッドとブロックまでまっすぐに使用します。パフォーマンスは低下しますが、少なくとも停止することはありません。
  • タスクの再開にAMQPスレッドが使用されていることをどういうわけか除外します。私は彼らが寝ていたと仮定して、デフォルトのTaskSchedulerを使用することに決めました。タスクスケジューラにそれらのスレッドが限界に達していないことを伝える方法を見つけることができれば、それは素晴らしいことです。

これはなぜ起こっているのか、これを解決するための提案がありますか?今はプログラムが信頼できるように非同期コードを削除していますが、ここで何が起こっているのか本当に分かりたいです。

+0

*私は消費者がこの待機時間*中に実際にスレッドコンテキストを解放できるようにしたいと考えていました。待っているスレッドが他のスレッドの実行をブロックしたと言っていますか?そうだ。 。 。奇妙な –

+0

RabbitMQクライアントは、何かを受け取るまで発信者をブロックする「QueuingBasicConsumer」を提供します。私はタスクの代わりに生のスレッドを使用していた場合、そのスレッドはスリープ状態になり、何もしません。私はこのブロックを避けて、その時間にスレッドが別のタスクを実行できるように(スループットを向上させるため)、実際に呼び出しスレッドをブロックするのではなく、タスクを使用する待ち時間のある消費者を書きました。 –

+1

私はその部分を理解しました。私が理解できないことは、あなたが解決しようとしている問題です。'QueuingBasicConsumer'が有能に書かれていれば、それはビジーではない待機であり、ブロックされたスレッドは待機中にCPUサイクルを消費しないため、実行中の他の作業には影響しません。これらのスレッドをRabbitMQ専用にして、他のジョブには他のスレッドを使用してください。 –

答えて

5

awaitがどのようにコンテキストを取得し、それを使用して実行を再開するかを正確に説明する私のasync introを読むことをお勧めします。つまり、現在のSynchronizationContext(またはがnullの場合は現在のTaskScheduler)をキャプチャします。

もう1つの重要な詳細は、async継続がTaskContinuationOptions.ExecuteSynchronously(@svickはコメントで指摘されているように)でスケジュールされていることです。私はblog post about thisを持っていますが、AFAIKは公式にはどこにも書かれていません。この詳細になりますasyncプロデューサー/コンシューマーキューを書くことが困難になります。

のRabbitMQのスレッドがSynchronizationContextTaskScheduler持っていないため、awaitは(たぶん)で、「元のコンテキストに切り替える」されていない理由 - それらのスレッドが見えるので、あなたがTrySetResult呼び出すときので、継続が直接実行されます通常のスレッドプールスレッドと同様です。

あなたのコードを読んで、私はあなたのリーダー/ライターのロックと同時キューの使用が間違っていると思われます。私はコード全体を見ることなく確認することはできませんが、それは私の印象です。

既存のasyncキューを使用し、その周りにコンシューマーを構築することを強くお勧めします。 BufferBlock<T>タイプがTPL Dataflowの場合は、asyncキューとして動作できます。あなたのプラットフォームでデータフローが利用可能な場合は、これが私の最初の勧告となります。それ以外の場合は、AsyncProducerConsumerQueue type in my AsyncEx library、またはwrite your own(私のブログで説明しています)とすることができます。ここで

BufferBlock<T>を使った例です:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); 

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) 
{ 
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); 
    _queue.Post(e); 
} 

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) 
{ 
    return _queue.ReceiveAsync(cancellationToken); 
} 

この例では、私はあなたのDequeueAsync APIを維持しています。ただし、TPLデータフローの使用を開始した後は、TPLデータフローを他の場所でも使用することを検討してください。このようなキューが必要な場合は、コードの他の部分もデータフローのアプローチの恩恵を受けることが一般的です。たとえば、DequeueAsyncというメソッドを呼び出す代わりに、BufferBlockActionBlockにリンクすることができます。

+0

私はおそらく意図されていない方法でReaderWriterLockSlimを使用しました。私は、複数の人がDequeueAsyncを実行できると言っていましたが、イベントが処理されている間は、誰もハンドラ以外のいずれのキューにも触れてはいけません。 DeelueAsyncを呼び出す前に受信したメッセージを見逃すことがないように、「未配信」キューを追加するまでは必要ありませんでした。私はそれを間違って使っていますか?ここには、AwaitableBasicConsumerの完全なコードがあります:https://gist.github.com/kcuzner/7242836 –

+0

それはうまくいくようです。しかし、それは単純な「ロック」の上にはあなたをあまり魅了しません。別の問題があります: 'cancellationToken.Register'はデリゲートをインラインで実行することができ、' SetResult'は失敗します。私がむしろスティーブン・トゥーブに残すもう一つの不明瞭なコーナー条件。 :) –

+0

もう1つのフォローアップの質問:RabbitMQは、単に新しいスレッド(新しいThreadStart(...)で標準的な方法でスレッドを作成します。これらのスレッドはタスクを実行する資格がありますか、 (私が今までには分かっていなかった) 'BufferBlock 'を使うことができますし、そうでないかもしれません。 –