2016-09-27 6 views
1

をトリガし、私は、サービス・バスを持って、次の形式の機能をトリガ:AzureのWebJobs SDK:message.completeを呼び出していない()は、特定のServiceBusの機能

public static void ProcessJobs([ServiceBusTrigger("topicname", "subscriptionname", AccessRights.Listen)] BrokeredMessage input, [ServiceBus("topic2name", "subscription2name", AccessRights.Send)] out BrokeredMessage output) 
{ 
    output = new BrokeredMessage(input.GetBody<String>()); 
} 

私のユースケースは、この関数は単にからのメッセージを取るということです1つのトピックを別のトピックにプッシュします。私は、プロセスのソーストピックからメッセージを削除したくありません。

これは可能ですか?

さらに、AccessRightsに関する詳細情報と、それらがメッセージへのアクセスにどのように影響するかについても知ることができます。例:上記の例では、入力トピックのメッセージをAccessRights.Listenとしていますが、これらのメッセージに対して関数呼び出しが行われると、メッセージは「削除」されているようです。

+0

なぜメッセージを完了しないのですか? – Thomas

+0

@トーマス:これがアンチパターンであるかどうかは分かりませんが、サービスバスのキューを入れ子にすることを検討していました。したがって、この場合、親キューはそのアイテムを子キューに渡します。子プロセスが完了すると、(親キューアイテムとともに)親キューアイテムが削除されます。これは特定の技術的理由ではなく、アプリケーションのセマンティクスのためのものです。 –

+0

トピックを使用する場合、複数のサブスクリプションを持つことができ、親/子キュー間の通信を処理するためにFilterを使用できます。メッセージを完成させないと、メッセージを再処理することもできます。 – Thomas

答えて

2

Jamborは、デフォルトの動作は、関数が正常に終了した場合にメッセージを完了(documentationを参照)、または関数が失敗した場合に放棄することで言ったように。

あなたはMessageProcessorクラスのコードを見てSDK Repoでこの動作の実装を見ることができます:

public virtual async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken) 
{ 
    if (result.Succeeded) 
    { 
     if (!MessageOptions.AutoComplete) 
     { 
      // AutoComplete is true by default, but if set to false 
      // we need to complete the message 
      cancellationToken.ThrowIfCancellationRequested(); 
      await message.CompleteAsync(); 
     } 
    } 
    else 
    { 
     cancellationToken.ThrowIfCancellationRequested(); 
     await message.AbandonAsync(); 
    } 
} 

興味深い点は:これは仮想関数です。

ServiceBusConfigurationは、MessagingProviderプロパティを公開します。

/// <summary> 
/// Creates a <see cref="MessageProcessor"/> for the specified ServiceBus entity. 
/// </summary> 
/// <param name="entityPath">The ServiceBus entity to create a <see cref="MessageProcessor"/> for.</param> 
/// <returns>The <see cref="MessageProcessor"/>.</returns> 
public virtual MessageProcessor CreateMessageProcessor(string entityPath) 
{ 
    if (string.IsNullOrEmpty(entityPath)) 
    { 
     throw new ArgumentNullException("entityPath"); 
    } 
    return new MessageProcessor(_config.MessageOptions); 
} 

この機能は次のとおりです。あなたがSDK RepoのデフォルトMessagingProviderクラスのコードを見ている場合は

、あなたは新しいMessageProcessorを作成する責任があるメソッドをオーバーライドすることができていることがわかりますまた、バーチャル。

今、あなたはあなた自身のMessagingProviderMessageProcessor実装を作成することができます

public class CustomMessagingProvider : MessagingProvider 
{ 
    private readonly ServiceBusConfiguration _config; 

    public CustomMessagingProvider(ServiceBusConfiguration config) : base(config) 
    { 
     _config = config; 
    } 

    public override MessageProcessor CreateMessageProcessor(string entityPath) 
    { 
     if (string.IsNullOrEmpty(entityPath)) 
     { 
      throw new ArgumentNullException("entityPath"); 
     } 
     return new CustomMessageProcessor(_config.MessageOptions); 
    } 

    class CustomMessageProcessor : MessageProcessor 
    { 
     public CustomMessageProcessor(OnMessageOptions messageOptions) : base(messageOptions) 
     { 
     } 

     public override async Task CompleteProcessingMessageAsync(BrokeredMessage message, FunctionResult result, CancellationToken cancellationToken) 
     { 
      if (!result.Succeeded) 
      { 
       cancellationToken.ThrowIfCancellationRequested(); 
       await message.AbandonAsync(); 
      } 
     } 
    } 
} 

をそして、そのようなあなたのJobHostを設定:

public static void Main() 
{ 
    var config = new JobHostConfiguration(); 
    var sbConfig = new ServiceBusConfiguration 
    { 
     MessageOptions = new OnMessageOptions 
     { 
      AutoComplete = false 
     } 
    }; 
    sbConfig.MessagingProvider = new CustomMessagingProvider(sbConfig); 
    config.UseServiceBus(sbConfig); 
    var host = new JobHost(config); 
    host.RunAndBlock(); 
} 

これは、この質問の技術的な部分のためだった...

メッセージを完了していない場合、メッセージは同じ機能に対して再び利用可能になります。あなたがMaxDeliveryCountに到達するまでもう一度dを押すとメッセージが死んでしまいます。したがって、あなたの関数が等価であるように設計しても、これはあなたが望むものではないと確信しています。

多分、あなたが達成しようとしていることをもう少し説明してください。

あなたは、そうでない場合:あなたは(コメントを質問参照)親子キューの通信を探しているなら

、ASBとワークフローを設計する方法を説明する良い記事がありますBrokerMessageオブジェクトのDeferメソッドを見ることができます。

受信者がprocを延期したいことを示しますこのメッセージのエッセイ。

子メッセージが処理されるまで親メッセージを保持することができます。

0

プロセスのソーストピックからメッセージを削除したくないです。

this documentから、トリガーが完了した後にServiceBusTriggerがcomplete関数を呼び出すことがわかります。これがデフォルトモードです。ここではその記事からの抜粋です:関数が正常に終了し、または関数が失敗した場合の通話が放棄場合

SDKは、メッセージに完全PeekLockモードと呼び出しでメッセージを受け取ります。関数がPeekLockタイムアウトよりも長く実行されると、ロックは自動的に更新されます。