2017-02-16 13 views
0

私が質問したのはhereです。なぜThread.Runを使用してプロセスを開始すると、私が期待したほど多くの同時要求が実行されなかったのですか?rabbitmqメッセージの処理concurrenrtly

この質問の背後にある理由は、私がrabbitmqキューからメッセージを引き出して同時に最大数の同時メッセージを処理できるクラスを作成しようとしていたためです。

これを行うには、私はEventingBasicConsumerクラスのReceivedハンドラーで次のようにしました。

async void Handle(EventArgs e) 
{ 
    await _semaphore.WaitAsync(); 

    var thread = new Thread(() => 
    { 
     Process(e); 
     _semaphore.Release(); 
     _channel.BasicAck(....); 
    }); 
    thread.Start(); 
} 

ただし、前の投稿のコメントは、CPUの動作をしない限りスレッドを開始しないというものでした。

上記のハンドラは、作業がCPUバインド、ネットワーク、ディスクまたはそれ以外であるかどうかを認識しません。 (Processは抽象メソッドです)。

ここでもスレッドまたはタスクを開始する必要があると思いますが、そうでなければProcessメソッドはrabbitmqスレッドをブロックし、イベントハンドラは終了するまで再度呼び出されません。だから私は一度に一つの方法しか扱えない。

新しいThreadはここから始まります。大丈夫ですか?もともと私はTask.Runを使用していましたが、これは必要な数の労働者を生産しませんでした。他の記事を参照してください。

FYI。同時スレッドの数は、セマフォ上にInitialCountを設定することによって制限されます。

答えて

0

既に言われているように、スレッドの数が多いとパフォーマンスが保証されません。その数が論理コアの数を上回るような場合、実際の作業が行われていない状態でthread starvationになります。

しかし、同時に実行する操作の数を処理する必要がある場合は、this tutorialのようにMaxDegreeOfParallelismの設定でTPL Dataflowライブラリに試してみてください。

var workerBlock = new ActionBlock<EventArgs>(
    // Process event 
    e => Process(e), 
    // Specify a maximum degree of parallelism. 
    new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = InitialCount 
    }); 
var bufferBlock = new BufferBlock(); 
// link the blocks for automatically propagading the messages 
bufferBlock.LinkTo(workerBlock); 

// asynchronously send the message 
await bufferBlock.SendAsync(...); 
// synchronously send the message 
bufferBlock.Post(...); 

はメッセージの順序が保持されます。また、あなたがフィルターラムダでブロックを結ぶと(並列処理の程度が異なると)異なるハンドラを追加することができます

bufferBlock.LinkTo(cpuWorkerBlock, e => e is CpuEventArgs); 
bufferBlock.LinkTo(networkWorkerBlock, e => e is NetworkEventArgs); 
bufferBlock.LinkTo(diskWorkerBlock, e => e is DiskEventArgs); 

をが、この場合では、セットアップは、チェーンの最後にデフォルトのハンドラはずなので、メッセージは、(あなたがこのためにNullTargetブロックを使用することができます)消えないだろう:

また
bufferBlock.LinkTo(DataflowBlock.NullTarget<EventArgs>); 

彼らは完全にUI側にReactive Extensionsで動作するので、ブロックは、オブザーバーである可能性があります。

関連する問題