2017-02-07 11 views
1

Active MQパブリッシャとサブスクライバをC#で実装しています。私は、Apache.NMS.ActiveMQ .netクライアントライブラリを使用して、ブローカと通信しています。フェイルオーバーセットアップでActiveMQの未確認メッセージがサブスクライバに再配信されない

<package id="Apache.NMS" version="1.7.1" targetFramework="net461" /> 
    <package id="Apache.NMS.ActiveMQ" version="1.7.2" targetFramework="net461" /> 

ActiveMQのは、4台のサーバー(0.225、0.226、0.346、0.347 - 参照用IPの最後部)にセットアップフェイルオーバーで構成されています。 // TCP://103.24.34.225:61616、TCP://103.24.34.226:61616、TCP://103.24.34.346:61616、TCP://ブローカーのURLが

フェイルオーバーのようになりますここで103.24.34.347:61616

は、私がここで

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
    var connectionFactory = new ConnectionFactory(brokerUrl); 
    using (var connection = connectionFactory.CreateConnection("conn1", "conn$34")) 
    { 
     connection.ClientId = "TESTPUBLISHER"; 
     connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge; 
     connection.Start(); 

     var session = connection.CreateSession(); 
     var topic = new ActiveMQTopic("ACCOUNT.UPDATE"); 
     var producer = session.CreateProducer(topic); 


     var msg = "43342_test"; //DateTime.Now.ToString("yyyyMdHHmmss_fff") + "-TEST"; 
     var textMessage = producer.CreateTextMessage(msg); 

     textMessage.Properties.SetString("Topic", "ACCOUNT.UPDATE"); 
     textMessage.Properties.SetString("Action", "UPDATE"); 
     textMessage.Properties.SetString("DataContractType", "Account"); 

     producer.Send(textMessage, MsgDeliveryMode.Persistent, MsgPriority.Normal, new TimeSpan(0, 0, 60, 0, 0)); 

    } 

を公開していますどのように私は、トピックに加入しています方法です。このコードは、複数の共有加入者が着信メッセージを受信できるように設定されています。私はそれを達成するためにバーチャルトピックを使用しなければならないと言われました。だから私は仮想トピックを使用するように加入者を設定し、Windowsサービスプロジェクト内でホストされます。私は肯定応答モードを使用してClientAcknowledgeになっているので、メッセージが確認されない限り、メッセージは返され続けます。以下のコードスニペットは、Windowsサービスの重要なサブスクライバ部分を表しています。

var brokerUrl = "failover://tcp://103.24.34.225:61616,tcp://103.24.34.226:61616,tcp://103.24.34.346:61616,tcp://103.24.34.347:61616"; 
IConnectionFactory factory = new ConnectionFactory(new Uri(brokerUrl)); 
IConnection connection = factory.CreateConnection("conn1", "conn$34")) 

    connection.ClientId = "TESTSUBSCRIBER"; 
    connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge; 
    connection.ConnectionInterruptedListener += OnConnectionInturrupted; 
    connection.ExceptionListener += OnConnectionExceptionListener; 
    connection.ConnectionResumedListener += OnConnectionResumedListener; 
    connection.Start(); 

    ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge); 
    var queue = new ActiveMQQueue("VT.TESTSUBSCRIBER.ACCOUNT.UPDATE"); 
    ActiveMQTopic topic = new ActiveMQTopic(); 
    IMessageConsumer consumer = session.CreateConsumer(queue); 

    consumer.Listener += OnMessage; 



private void OnMessage(IMessage message) 
{ 
    var payload = ((ITextMessage)message).Text; 
    Log.Info($"Received message for Client TESTSUBSCRIBER - [{payload}]"); 
    if(payload != "43342_test") 
    { 
     message.Acknowledge(); 
      Log.Info($"Message acknowledged for Client TESTSUBSCRIBER - [{payload}]"); 
    } 
} 

private void OnConnectionResumedListener() 
{ 
    Log.Info($"Subscriber connection resumed for Client TESTSUBSCRIBER"); 
} 

private void OnConnectionExceptionListener(Exception exception) 
{ 
    Log.Error(exception); 
} 

private void OnConnectionInturrupted() 
{ 
    Log.Error($"Subscriber connection interrupted for Client TESTSUBSCRIBER"); 
} 

私はパブリッシュおよびサブスクライバメッセージを発行できます。私はある特定のケースで問題に遭遇しています。サブスクライバが、フェールオーバーサーバーのプールから(.225ブローカーサーバーへの)接続を確立したとします。サイト運営者がメッセージを公開しました。加入者はそれを受け取り、それは処理の途中です。しかし、いくつかのサーバーパッチのメンテナンスのために、Windowsサービスはシャットダウンする必要がありました。その結果、ブローカへのサブスクライバ接続が切断されました。 Windowsサービスが復旧したとき、この時間のサブスクライバは、フェールオーバープールから別のブローカサーバ(.346ブローカサーバ)への接続を確立しました。それが起こったとき、未確認のメッセージは決して再配信されませんでした。しかし、私がWindowsサービスを再起動して何らかの不運があった場合.225ブローカー(加入者が最初に接続されたサーバー)に接続が確立された場合、今すぐ加入者は未確認のメッセージを受信します。

ActiveMQがフェイルオーバー設定で構成されている場合、フェイルオーバープールからどのブローカサーバーに接続を確立しても、未確認のメッセージが常に受信されるはずです。

状況によっては、フェイルオーバーの設定が機能しているようです。サブスクライバがフェールオーバープールから.346ブローカサーバに接続されたと仮定します。パブリッシャは、同じプールから異なるブローカサーバ(.225ブローカ)に接続され、メッセージを発行し、ユーザはメッセージを受信して​​います。これは、フェールオーバーセットアップが機能していることを証明します。

ただし、サブスクライバがブローカサーバからメッセージを受信し、そのメッセージを確認する前にサブスクライバが切断されている場合は、未確認メッセージを受信するために同じブローカサーバへの接続を再確立する必要があります。これは私には分かりません。

このユースケースを動作させるためにActive MQサーバの設定に追加の設定が必要ですか?

答えて

0

この問題に対する解決策は、クライアント側ではなく、アクティブMQサーバー構成で行われました。

プロデューサフロー制御先ポリシーの場合は、ConditionalNetworkBridgeFilterを追加し、replayWhenNoConsumersを有効にします。

関連する問題