2016-10-27 4 views
1

複数のデバイスが1つのAzure Service Busキュー(またはトピック)で終了するメッセージを送信しています。複数のメッセージを並行して処理したいが、は同じデバイスの2つのメッセージを同時に処理しないようにしたい。Azure Service Bus - 同じデバイスからのメッセージを並行して処理しないようにします。

次の図は目標を示しています。 3つの処理スレッドがあります(実際には複数のサーバーに分散された数十もの数があります)。各ボックスは1つのメッセージの処理時間を示し、色はどのデバイスに属しているかを示します。

Concurrent processing of the messages

あなたは時間内にノーポイントで二つ以上の重複メッセージがあることを確認することができ、同じデバイスから発信。

関与する複数の処理サーバが存在するので、私は、並行処理を防止するための唯一の方法は、パーティション化キーなどのデバイスIDを持つメッセージを分割することで、その後、唯一のパーティションごとに単一の消費者を持っていることを想像することができます:

enter image description here

したがって、「黄色のデバイス」からのすべてのメッセージはパーティション1に移動します。

私はまだ1つのプロセスで複数の処理スレッドを実行します。今、私たちは

var client = QueueClient.CreateFromConnectionString(connectionString, queueName); 
var options = new OnMessageOptions { MaxConcurrentCalls = x }; 
client.OnMessage(m => 
    { 
     // Process... 
     m.Complete(); 
    }); 

のような単純な何かをするにはどうすれば、このようなコードに並行性の制限が組み込まれていますか?

アクターやその他の並行処理メカニズムに基づいてクライアント側のソリューションを想像することができます。しかし、ブローカレベルでそれを解決する方法はありますか?

答えて

1

これはASB s Sessions feature. You will be able to use OnMessage` APIを利用するのに適していますが、特定のセッションの処理は複数のコンシューマではなく、単一のコンシューマによってのみ行われます。また、同時に実行して、負荷を処理することができます。

良い出発点は、QueueClient.AcceptMessageSessionAsync APIです。どのように動作するのか説明付きの堅実なドキュメントが必要な場合は、this sampleが最適なドコです。

+0

私はセッションを見ましたが、関連するメッセージの処理を1つのトランザクションでまとめてグループ化することを目的とした(間違った)印象を受けました。 Azureサンプルへのリンクをありがとう、 'IMessageSessionAsyncHandler'が私の必要とするものです。私は、無期限のセッション存続期間の場合に、ある時点でセッションを閉じる必要があるかどうかはまだ分かりません。 – Mikhail

+0

それは正しいです。私はあなたのメッセージ "ストリーム"が無限ではないことを示唆しました。そしてそのように、セッションを表すでしょう。それがシナリオではない場合、セッションはあなたにとって正しい選択肢ではありません。 –

+0

ストリームを見ている特定のケースでは、寿命が限られていますが、最終的に数日かかることがあります。 – Mikhail

関連する問題