2016-11-22 6 views
0

これはthis questionと重複している可能性がありますが、これはデータベースのバッチ更新についての説明と混同されています。別のスレッドからアクセスした後にBrokeredMessageを処理しました

Azure Service Busのキューを使用した簡単な例では、キューに入れたBrokeredMessageにアクセスできません。別のスレッドからキューを読み込むと、常に破棄されます。

サンプルコード:

class Program { 
    private static string _serviceBusConnectionString = "XXX"; 

    private static BlockingCollection<BrokeredMessage> _incomingMessages = new BlockingCollection<BrokeredMessage>(); 
    private static CancellationTokenSource _cancelToken = new CancellationTokenSource(); 

    private static QueueClient _client; 

    static void Main(string[] args) { 

     // Set up a few listeners on different threads 
     Task.Run(async() => { 
      while (!_cancelToken.IsCancellationRequested) { 
       var msg = _incomingMessages.Take(_cancelToken.Token); 
       if (msg != null) { 
        try { 
         await msg.CompleteAsync(); 
         Console.WriteLine($"Completed Message Id: {msg.MessageId}"); 
        } catch (ObjectDisposedException) { 
         Console.WriteLine("Message was disposed!?"); 
        } 
       } 
      } 
     }); 


     // Now set up our service bus reader 
     _client = GetQueueClient("test"); 

     _client.OnMessageAsync(async (message) => { 
      await Task.Run(() => _incomingMessages.Add(message)); 
     }, 
     new OnMessageOptions() { 
      AutoComplete = false 
     }); 

     // Now start sending 
     Task.Run(async() => { 
      int sent = 0; 
      while (!_cancelToken.IsCancellationRequested) { 
       var msg = new BrokeredMessage(); 
       await _client.SendAsync(msg); 
       Console.WriteLine($"Sent {++sent}"); 
       await Task.Delay(1000); 
      } 
     }); 

     Console.ReadKey(); 
     _cancelToken.Cancel(); 

    } 

    private static QueueClient GetQueueClient(string queueName) { 

     var namespaceManager = NamespaceManager.CreateFromConnectionString(_serviceBusConnectionString); 
     if (!namespaceManager.QueueExists(queueName)) { 
      var settings = new QueueDescription(queueName); 
      settings.MaxDeliveryCount = 10; 
      settings.LockDuration = TimeSpan.FromSeconds(5); 
      settings.EnableExpress = true; 
      settings.EnablePartitioning = true; 
      namespaceManager.CreateQueue(settings); 
     } 

     var factory = MessagingFactory.CreateFromConnectionString(_serviceBusConnectionString); 
     factory.RetryPolicy = new RetryExponential(minBackoff: TimeSpan.FromSeconds(0.1), maxBackoff: TimeSpan.FromSeconds(30), maxRetryCount: 100); 
     var queueClient = factory.CreateQueueClient(queueName); 

     return queueClient; 
    } 
} 

私は設定で遊んで試してみたが、これは動作させることはできません。何か案は?

答えて

2

マイクロソフトhere @ Serkantカラカからの応答で自分の質問に答える:

非常に基本的なルールを、私はこれが文書化されているかどうかわからないです。受信したメッセージは、コールバック関数の存続時間内に処理する必要があります。あなたのケースでは、非同期コールバックが完了するとメッセージが破棄されるので、別のスレッドでObjectDisposedExceptionが発生して完全な試行が失敗するのはこのためです。

さらなる処理のためのキューイングメッセージがスループットにどのように役立つかは実際にはわかりません。これは確実にクライアントに多くの負担をかけることになります。実行可能なはずの非同期コールバックでメッセージを処理してみてください。

バガー。

関連する問題