2017-05-06 16 views
3

私はActiveMQを初めて使用しましたが、耐久性のあるパブリッシャを作成できました。クライアントIDを設定することはできません。クライアントIDでプロパティを見つけられず、Googleでも見つけることができないためです。私はいくつかのサンプルコードを取得する場合は大きな助けになるでしょう。クライアントIDとサブスクライバ名およびトピック名を使用して.NET CoreのAMQP.Net Liteライブラリを使用して永続的なパブリッシャ/サブスクライバトピックを作成する方法

注: NMSプロトコルではありません。私はAMQP.Net LiteをActiveMQ with .NET Core Web APIで使用して、ClientIdで永続的なパブリッシャ/サブスクライバを作成しています。

+0

応答を待っています... – Mhasan

答えて

2

ActiveMQまたはActiveMQ Artemisへの恒久サブスクリプションを作成するには、クライアントがいくつかのことを行う必要があります。以下のコードで見ることができるAMQP「ContainerIDの」プロパティを使用してクライアントのためのユニークな「クライアントID」を設定し

  1. 。クライアントは、接続するたびにその同じコンテナIDを使用し、永続サブスクリプションを回復する必要があります。

  2. 新しいセッションを作成します。

  3. 登録するアドレス(ここではトピック)の新しい受信者を作成します。恒久サブスクリプションのソースは、アドレスがトピックアドレス(ActiveMQではtopic:// name)に設定されている必要があります。 Sourceには、EXPWRAYポリシーがNEVERに設定されている必要があります。また、Sourceには、UNSETTLED_STATEに設定された端末の耐久性状態と、COPYに設定された配布モードも必要です。

  4. 受信者が作成されたら、startのonMessageハンドラを設定するか、またはreceiveを呼び出してメッセージを消費することができます(ブローカに送信するクレジットを与えたと仮定します)。


using System; 
using Amqp; 
using Amqp.Framing; 
using Amqp.Types; 
using Amqp.Sasl; 
using System.Threading;

namespace aorg.apache.activemq.examples { class Program { private static string DEFAULT_BROKER_URI = "amqp://localhost:5672"; private static string DEFAULT_CONTAINER_ID = "client-1"; private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription"; private static string DEFAULT_TOPIC_NAME = "test-topic";

static void Main(string[] args) { Console.WriteLine("Starting AMQP durable consumer example."); Console.WriteLine("Creating a Durable Subscription"); CreateDurableSubscription(); Console.WriteLine("Attempting to recover a Durable Subscription"); RecoverDurableSubscription(); Console.WriteLine("Unsubscribe a durable subscription"); UnsubscribeDurableSubscription(); Console.WriteLine("Attempting to recover a non-existent durable subscription"); try { RecoverDurableSubscription(); throw new Exception("Subscription was not deleted."); } catch (AmqpException) { Console.WriteLine("Recover failed as expected"); } Console.WriteLine("Example Complete."); } // Creating a durable subscription involves creating a Receiver with a Source that // has the address set to the Topic name where the client wants to subscribe along // with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the // Distribution Mode set to 'Copy'. The link name of the Receiver represents the // desired name of the Subscription and of course the Connection must carry a container // ID uniqure to the client that is creating the subscription. private static void CreateDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source source = CreateBasicSource(); // Create a Durable Consumer Source. source.Address = DEFAULT_TOPIC_NAME; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.DistributionMode = new Symbol("copy"); ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null); session.Close(); } finally { connection.Close(); } } // Recovering an existing subscription allows the client to ask the remote // peer if a subscription with the given name for the current 'Container ID' // exists. The process involves the client attaching a receiver with a null // Source on a link with the desired subscription name as the link name and // the broker will then return a Source instance if this current container // has a subscription registered with that subscription (link) name. private static void RecoverDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } session.Close(); } finally { connection.Close(); } } // Unsubscribing a durable subscription involves recovering an existing // subscription and then closing the receiver link explicitly or in AMQP // terms the close value of the Detach frame should be 'true' private static void UnsubscribeDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } // Closing the Receiver vs. detaching it will unsubscribe receiver.Close(); session.Close(); } finally { connection.Close(); } } // Creates a basic Source type that contains common attributes needed // to describe to the remote peer the features and expectations of the // Source of the Receiver link. private static Source CreateBasicSource() { Source source = new Source(); // These are the outcomes this link will accept. Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"), new Symbol("amqp:rejected:list"), new Symbol("amqp:released:list"), new Symbol("amqp:modified:list") }; // Default Outcome for deliveries not settled on this link Modified defaultOutcome = new Modified(); defaultOutcome.DeliveryFailed = true; defaultOutcome.UndeliverableHere = false; // Configure Source. source.DefaultOutcome = defaultOutcome; source.Outcomes = outcomes; return source; } }

}

+0

すごいです!ただし、RecoverDurableSubscription()メソッドには、グローバルな副作用があります。これは、関連するJMSセレクターを変更することです。これにより、プラットフォーム上の現在のアクティビティが中断される可能性があります。関連するJMSセレクタを変更または認識することなく、既存のReceiverLink(永続サブスクライバ)を取得する方法はありますか? –

関連する問題