1

以下は、ローカルSFクラスタで実行され、サービスバスキューをリッスンするステートレスサービスリスナーコードです。なぜICommunicationListener.Abort()は引き続き呼び出されますか?

私が望むのは、常時接続のSFサービスでキューコマンドを継続的に聞くことです。コード改善のヒントは大歓迎です!

問題#1

中止は、()連続して効率的に私の接続を閉じ、と呼ばれています。この現象の原因は何ですか?どのように修正するのですか?私の理解では、Abort()は未処理の例外や強制終了の場合にのみ呼び出されるはずであり、どちらも私が認識していません。

ボーナス問題は、我々のキューが正しく処理することができます中止()からCloseClientコールを、コメントアウト言います。 1番目のメッセージの後、CancellationTokenのWaitHandleは破棄されたものとしてマークされ、コールバックに渡すと例外がスローされます。これを引き起こしているのは何ですか?

ありがとうございました!

using System; 
using System.Threading; 
using System.Threading.Tasks; 
using Microsoft.ServiceBus.Messaging; 
using Microsoft.ServiceFabric.Services.Communication.Runtime; 

namespace Common 
{ 
    public class QueueListener : ICommunicationListener 
    { 
     private readonly string _connectionString; 
     private readonly string _path; 
     private readonly Action<BrokeredMessage> _callback; 

     private QueueClient _client; 

     public QueueListener(string connectionString, string path, Action<BrokeredMessage> callback) 
     { 
      // Set field values 
      _connectionString = connectionString; 
      _path = path; 

      // Save callback action 
      _callback = callback; 
     } 

     public Task<string> OpenAsync(CancellationToken cancellationToken) 
     { 
      // Connect to subscription 
      _client = QueueClient.CreateFromConnectionString(_connectionString, _path); 

      // Configure the callback options 
      var options = new OnMessageOptions 
      { 
       AutoComplete = false 
      }; 

      // Catch and throw exceptions 
      options.ExceptionReceived += (sender, args) => throw args.Exception; 

      // Wire callback on message receipt 
      _client.OnMessageAsync(message => 
      { 
       return Task.Run(() => _callback(message), cancellationToken) 
        .ContinueWith(task => 
        { 
         if (task.Status == TaskStatus.RanToCompletion) 
          message.CompleteAsync(); 
         else 
          message.AbandonAsync(); 
        }, cancellationToken); 
      }, options); 

      return Task.FromResult(_client.Path); 
     } 

     public Task CloseAsync(CancellationToken cancellationToken) 
     { 
      CloseClient(); 
      return Task.FromResult(_client.Path); 
     } 

     public void Abort() 
     { 
      CloseClient(); 
     } 

     private void CloseClient() 
     { 
      // Make sure client is still open 
      if (_client == null || _client.IsClosed) 
       return; 

      // Close connection 
      _client.Close(); 
      _client = null; 
     } 
    } 
} 

答えて

0

問題は、サービスバスとインターフェイスするためのICommunicationListenerの作成の背後にある概念全体でした。リスナーは何も聞いていません!

StatelessService実装内のデフォルトのRunAsync()メソッドにサービスバス接続をリファクタリングすると、およびが問題を修正し、キャンセルトークンを期待どおりに監視できるようになります。

3

それが今であるようOpenAsyncに渡されるキャンセルトークンは、メソッドの範囲の外で使用されるものではありません。あなたはOnMessageにそれを渡しています。 OpenAsyncが呼び出された後にOnMessageが呼び出されますが、OpenAsynの完了後にトークンが破棄されます。

CancellationTokenを新規作成するか、このナゲットパッケージを使用してhereに役立ててください。

+0

ありがとうございました!完璧な意味合いを持つ。主な問題のアイデアは? – denious

+0

私は、OnMessageが処理されたトークンのため例外を生成すると考えています。これによりExceptionReceivedイベントが発生し、処理されない例外が発生し、Abortがトリガーされます。しかし、デバッグして確認する必要があります。 – LoekD

+0

これはそれではありませんでしたが、私はそれを理解しました。再度、感謝します! – denious

関連する問題