私は少し非同期的なものを待っています。私はいくつかのプログラム間でメッセージの送受信にRabbitMQを使用しています。特定のスレッドでタスクが実行されないようにする
背景として、RabbitMQクライアントは、接続スレッドとハートビートスレッドの2つのスレッドを使用しています。 TCPを介してメッセージを受信するたびに、接続スレッドがメッセージを処理し、インタフェース経由でコールバックを呼び出します。ドキュメントでは、この呼び出し中に接続と同じスレッド上で行われているため、作業を続ける必要があるため、多くの作業を避けることが最善の方法です。彼らはQueueingBasicConsumer
を提供しています。これは、メッセージを受信するのを待つために使用されるブロッキングの「デキュー」メソッドを備えています。
私は消費者がこの待機時間中に実際にスレッドコンテキストを解放して他の人が仕事をすることができるようにしたいので、非同期/待機のタスクを使用することに決めました。
私はawaitableデキューの方法があります:私は、次の方法で
TaskCompletionSource
Sを使用していますAwaitableBasicConsumer
クラスを書いたこれが呼び出されます。
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
RabbitMQのクライアントが呼び出すコールバックタスクを満たしAMQP接続ねじ
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
rwLock
のコンテキストからであります。 2つのキュー(deliveryTCS
およびundelivered
)は、ConcurrentQueuesです。
問題:たまに
ごとに、デキュー・メソッドが例外をスロー待っ方法。このメソッドはasync
であるため、通常は問題にはならないため、タスクが入力する「例外」完了状態になります。問題は、DequeueAsync
を呼び出すタスクが、RabbitMQクライアントが作成するAMQP接続スレッドが待機した後に再開される状況にあります。通常、私はタスクがメインスレッドまたは周囲に浮かぶワーカースレッドの1つに再開するのを見ました。しかし、AMQPスレッドを再開して例外がスローされると、すべてが停止します。 タスクはに「例外状態」を入力せず、例外が発生したメソッドを実行中であることを示すAMQP接続スレッドが残ります。これは動作しませんなぜここ
私の主な混乱は次のとおりです。ここで
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
は、テスト用に設定RunAsync
方法で、次のとおりです。
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
私が書いたものを読んで、私は見ます私は私の問題をとてもうまく説明していないかもしれません。明確化が必要な場合は、尋ねてください。次のように私が出ている
私の解決策は以下のとおりです。
- すべての非同期/待つコードを削除し、単にスレッドとブロックまでまっすぐに使用します。パフォーマンスは低下しますが、少なくとも停止することはありません。
- タスクの再開にAMQPスレッドが使用されていることをどういうわけか除外します。私は彼らが寝ていたと仮定して、デフォルトの
TaskScheduler
を使用することに決めました。タスクスケジューラにそれらのスレッドが限界に達していないことを伝える方法を見つけることができれば、それは素晴らしいことです。
これはなぜ起こっているのか、これを解決するための提案がありますか?今はプログラムが信頼できるように非同期コードを削除していますが、ここで何が起こっているのか本当に分かりたいです。
*私は消費者がこの待機時間*中に実際にスレッドコンテキストを解放できるようにしたいと考えていました。待っているスレッドが他のスレッドの実行をブロックしたと言っていますか?そうだ。 。 。奇妙な –
RabbitMQクライアントは、何かを受け取るまで発信者をブロックする「QueuingBasicConsumer」を提供します。私はタスクの代わりに生のスレッドを使用していた場合、そのスレッドはスリープ状態になり、何もしません。私はこのブロックを避けて、その時間にスレッドが別のタスクを実行できるように(スループットを向上させるため)、実際に呼び出しスレッドをブロックするのではなく、タスクを使用する待ち時間のある消費者を書きました。 –
私はその部分を理解しました。私が理解できないことは、あなたが解決しようとしている問題です。'QueuingBasicConsumer'が有能に書かれていれば、それはビジーではない待機であり、ブロックされたスレッドは待機中にCPUサイクルを消費しないため、実行中の他の作業には影響しません。これらのスレッドをRabbitMQ専用にして、他のジョブには他のスレッドを使用してください。 –