Active MQパブリッシャとサブスクライバをC#で実装しています。私は、Apache.NMS.ActiveMQ .netクライアントライブラリを使用して、ブローカと通信しています。フェイルオーバーセットアップでActiveMQの未確認メッセージがサブスクライバに再配信されない
<package id="Apache.NMS" version="1.7.1" targetFramework="net461" />
<package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" />
ActiveMQのは、4台のサーバー(0.225、0.226、0.346、0.347 - 参照用IPの最後部)にセットアップフェイルオーバーで構成されています。 // TCP://103.24.34.225:61616、TCP://103.24.34.226:61616、TCP://103.24.34.346:61616、TCP://ブローカーのURLが
フェイルオーバーのようになりますここで103.24.34.347:61616
は、私がここで
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
var connectionFactory = new ConnectionFactory(brokerUrl);
using (var connection = connectionFactory.CreateConnection("conn1", "conn$34"))
{
connection.ClientId = "TESTPUBLISHER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.Start();
var session = connection.CreateSession();
var topic = new ActiveMQTopic("ACCOUNT.UPDATE");
var producer = session.CreateProducer(topic);
var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST";
var textMessage = producer.CreateTextMessage(msg);
textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE");
textMessage.Properties.SetString("Action", "UPDATE");
textMessage.Properties.SetString("DataContractType", "Account");
producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0));
}
を公開していますどのように私は、トピックに加入しています方法です。このコードは、複数の共有加入者が着信メッセージを受信できるように設定されています。私はそれを達成するためにバーチャルトピックを使用しなければならないと言われました。だから私は仮想トピックを使用するように加入者を設定し、Windowsサービスプロジェクト内でホストされます。私は肯定応答モードを使用してClientAcknowledgeになっているので、メッセージが確認されない限り、メッセージは返され続けます。以下のコードスニペットは、Windowsサービスの重要なサブスクライバ部分を表しています。
var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616";
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl));
IConnection connection = factory.CreateConnection("conn1", "conn$34"))
connection.ClientId = "TESTSUBSCRIBER";
connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
connection.ConnectionInterruptedListener += OnConnectionInturrupted;
connection.ExceptionListener += OnConnectionExceptionListener;
connection.ConnectionResumedListener += OnConnectionResumedListener;
connection.Start();
ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE");
ActiveMQTopic topic = new ActiveMQTopic();
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += OnMessage;
private void OnMessage(IMessage message)
{
var payload = ((ITextMessage)message).Text;
Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]");
if(payload != "43342_test")
{
message.Acknowledge();
Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]");
}
}
private void OnConnectionResumedListener()
{
Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER");
}
private void OnConnectionExceptionListener(Exception exception)
{
Log.Error(exception);
}
private void OnConnectionInturrupted()
{
Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER");
}
私はパブリッシュおよびサブスクライバメッセージを発行できます。私はある特定のケースで問題に遭遇しています。サブスクライバが、フェールオーバーサーバーのプールから(.225ブローカーサーバーへの)接続を確立したとします。サイト運営者がメッセージを公開しました。加入者はそれを受け取り、それは処理の途中です。しかし、いくつかのサーバーパッチのメンテナンスのために、Windowsサービスはシャットダウンする必要がありました。その結果、ブローカへのサブスクライバ接続が切断されました。 Windowsサービスが復旧したとき、この時間のサブスクライバは、フェールオーバープールから別のブローカサーバ(.346ブローカサーバ)への接続を確立しました。それが起こったとき、未確認のメッセージは決して再配信されませんでした。しかし、私がWindowsサービスを再起動して何らかの不運があった場合.225ブローカー(加入者が最初に接続されたサーバー)に接続が確立された場合、今すぐ加入者は未確認のメッセージを受信します。
ActiveMQがフェイルオーバー設定で構成されている場合、フェイルオーバープールからどのブローカサーバーに接続を確立しても、未確認のメッセージが常に受信されるはずです。
状況によっては、フェイルオーバーの設定が機能しているようです。サブスクライバがフェールオーバープールから.346ブローカサーバに接続されたと仮定します。パブリッシャは、同じプールから異なるブローカサーバ(.225ブローカ)に接続され、メッセージを発行し、ユーザはメッセージを受信しています。これは、フェールオーバーセットアップが機能していることを証明します。
ただし、サブスクライバがブローカサーバからメッセージを受信し、そのメッセージを確認する前にサブスクライバが切断されている場合は、未確認メッセージを受信するために同じブローカサーバへの接続を再確立する必要があります。これは私には分かりません。
このユースケースを動作させるためにActive MQサーバの設定に追加の設定が必要ですか?