2011-06-23 17 views
7

私はHornetQを初めて使用しています。コアapiを使用した後もHornetQメッセージがキューに残っています

私は、レイテンシと永続性が異なる(つまり、システムクラッシュに耐えられる)別のプロセス間で、サイズが約1kのメッセージを渡すことができるメッセージキューイングミドルウェアが必要です。私は同じキューに複数のプロセスを書き込んでいて、同様に複数のプロセスが同じキューから読み込むことになります。

これは、永続性のあるメッセージの受け渡しに最も適しているため、HornetQを選択しました。

私は現在、一人でサーバースタンドとしてHornetQのv2.2.2Finalをusungています。
は、私が正常にコアAPI(ClientSessionの)を使用し、耐久性/非耐久キューを作成し、成功し(ClientProducer)をキューにメッセージを投稿することができています。
同様に、core api (ClientConsumer)を使用してキューからメッセージを読み取ることができます。

この問題は、クライアントがメッセージを読み取ったときにメッセージがまだキューに残っている場合に発生します。つまり、キュー内のメッセージ数は一定のままです。。たぶん私はこれを間違っているが、私はこの印象の下でメッセージが消費されたら(read + ack)、それはキューから削除されます。私のケースでは起こっていない、同じメッセージがされている何度も繰り返し読んでください。

また、耐久性のないメッセージで耐久性のないキューを使用しようとしたことをお伝えしたいと思います。問題はのままです。私が使用していますプロデューサーのための

コード:

public class HQProducer implements Runnable { 

    private ClientProducer producer; 
    private boolean killme; 
    private ClientSession session; 
    private boolean durableMsg; 

    public HQProducer(String host, int port, String address, String queueName, 
      boolean deleteQ, boolean durable, boolean durableMsg, int pRate) { 
     this.durableMsg = durableMsg; 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      if (queueExists(queueName)) { 
       if (deleteQ) { 
        System.out.println("Deleting existing queue :: " + queueName); 
        session.deleteQueue(queueName); 
        System.out.println("Creating queue :: " + queueName); 
        session.createQueue(address, queueName, true); 
       } 
      } else { 
       System.out.println("Creating new queue :: " + queueName); 
       session.createQueue(address, queueName, durable); 
      } 
      producer = session.createProducer(SimpleString.toSimpleString(address), pRate); 

      killme = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killme) { 
      try { 
       ClientMessage message = session.createMessage(durableMsg); 

       message.getBodyBuffer().writeString("Hello world"); 

       producer.send(message); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("Producer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killme) { 
     this.killme = killme; 
    } 

    private boolean queueExists(String qname) { 
     boolean res = false; 
     try { 
      //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname)); 
      QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname)); 
      if (queueQuery.isExists()) { 
       res = true; 
      } 
     } catch (HornetQException ex) { 
      res = false; 
     } 
     return res; 
    } 
} 

はまた、消費者のためのコードは次のとおりです。

public class HQConsumer implements Runnable { 

    private ClientSession session; 
    private ClientConsumer consumer; 
    private boolean killMe; 

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) { 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      session.start(); 

      consumer = session.createConsumer(queueName, "",0,-1,browseOnly); 

      killMe = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killMe) { 
      try { 
       ClientMessage msgReceived = consumer.receive(); 
       msgReceived.acknowledge(); 
       //System.out.println("message = " + msgReceived.getBodyBuffer().readString()); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("ConSumer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killMe) { 
     this.killMe = killMe; 
    } 
} 

HornetQのサーバ設定::

<configuration xmlns="urn:hornetq" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> 

    <paging-directory>${data.dir:../data}/paging</paging-directory> 

    <bindings-directory>${data.dir:../data}/bindings</bindings-directory> 

    <journal-directory>${data.dir:../data}/journal</journal-directory> 

    <journal-min-files>10</journal-min-files> 

    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> 

    <connectors> 
     <connector name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </connector> 

     <connector name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     </connector> 
    </connectors> 

    <acceptors> 
     <acceptor name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </acceptor> 

     <acceptor name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     <param key="direct-deliver" value="false"/> 
     </acceptor> 
    </acceptors> 

    <security-settings> 
     <security-setting match="#"> 
     <permission type="createNonDurableQueue" roles="guest"/> 
     <permission type="deleteNonDurableQueue" roles="guest"/> 
     <permission type="createDurableQueue" roles="guest"/> 
     <permission type="deleteDurableQueue" roles="guest"/> 
     <permission type="consume" roles="guest"/> 
     <permission type="send" roles="guest"/> 
     </security-setting> 
    </security-settings> 

    <address-settings> 
     <!--default for catch all--> 
     <address-setting match="#"> 
     <dead-letter-address>jms.queue.DLQ</dead-letter-address> 
     <expiry-address>jms.queue.ExpiryQueue</expiry-address> 
     <redelivery-delay>0</redelivery-delay> 
     <max-size-bytes>10485760</max-size-bytes>  
     <message-counter-history-day-limit>10</message-counter-history-day-limit> 
     <address-full-policy>BLOCK</address-full-policy> 
     </address-setting> 
    </address-settings> 

</configuration> 
+0

/Cのための ありがとう:// docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354)あなたは同じことをやっているのですか? –

答えて

13

HornetQのコアAPIで明示的にメッセージを確認する必要があります。あなたのテストでどこが起こっているのか分かりません。

あなたが嫌がらせをしていない場合、これがメッセージがブロックされている理由です。私はあなたに完全な答えを与えるためにあなたの完全な例を見る必要があります。

また:あなたはとあなたのCreateSessionを定義する必要がありますのCreateSession(真、真、0)

コアAPIはバッチのACKへのオプションがあります。トランザクションセッションを使用していないため、serverLocatorで設定されたackBatchSizeに達するまで、サーバーに肯定応答は送信されません。これを実行すると、あなたのメッセージでacknowledge()を呼び出すとすぐに、どのackもサーバーに送られます。

現在使用しているオプションは、特定のDUPS_SIZEのJMS DUPS_OKに相当します。

(ポストはあなたといくつかの繰り返しの後に私の最初の答えを編集した)ackbatchsizeを設定

+1

'ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge(); '..私はコードに肯定応答しています –

+0

コアAPIにバッチACKオプションがあります。トランザクションセッションを使用していないため、serverLocatorで設定されたackBatchSizeに達するまで、サーバーに肯定応答は送信されません。 次のようにcreateSessionを定義する必要があります。 createSession(true、true、0); これで、あなたのメッセージでacknowledge()を呼び出すとすぐに、すべてのackがサーバーに送信されます。 –

+1

あなたはこのスレッドに戻りませんでした。だからあなたはあなたの問題を解決すると思いますか? –

2

は私が問題を解決する助け.. [この](HTTPに助け

+2

あなたはたぶんここの答えの一つに投票すべきだろう。 –

関連する問題