2013-01-23 23 views
9

EDIT:質問の言い換え:ActiveMQと組み込みブローカー

ActiveMQをサーバーとクライアントアプリケーション間のメッセンジャーサービスとして使用したいと考えています。

私のクライアントが消費するために生成されたメッセージを処理するために、サーバー内に組み込みブローカー(つまり別のプロセスではない)を設定しようとしています。このキューは永続化されます。

ブローカーの初期化を次のように

BrokerService broker = new BrokerService(); 
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(); 
adaptor.setDirectory(new File("activemq")); 
broker.setPersistenceAdapter(adaptor); 
broker.setUseJmx(true); 
broker.addConnector("tcp://localhost:61616"); 
broker.start(); 

をいじった後、私は、サーバーの一部のビーイングになってしまった:

public static class HelloWorldProducer implements Runnable { 
    public void run() { 
     try { 
      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need 
      Connection connection = connectionFactory.createConnection(); 
      connection.start(); 
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      Destination destination = session.createQueue("TEST.FOO"); 
      MessageProducer producer = session.createProducer(destination); 
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
      String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); 
      TextMessage message = session.createTextMessage(text); 
      System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); 
      producer.send(message); 
      session.close(); 
      connection.close(); 
     } 
     catch (Exception e) { 
      System.out.println("Caught: " + e); 
      e.printStackTrace(); 
     } 
    } 
} 

クライアントが非常に似ており、次のようになります。

メインメソッドは、スレッド内でこれらのそれぞれを開始するだけで、メッセージの生成/受信を開始しますges。

...しかし、私は、各スレッドの開始と次のように実行しています:

2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost) 
2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost 
2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state 
2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: [] 
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService 
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry 
    java.rmi.server.ExportException: internal error: ObjID already in use 
    at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186) 
    at sun.rmi.transport.Transport.exportObject(Transport.java:92) 
    at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247) 
    at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411) 
    at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147) 
     <snip....> 

メッセージが生成されているように思え、正常に消費する(私は以前について投稿し、他の問題が解決されました)、上記の例外は私を心配しています。

EDIT:ブローカーのシャットダウン時には、私は今も、以下に迎えています:

2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception: 
    java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) 
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269) 
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210) 
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202) 
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) 
    at java.lang.Thread.run(Thread.java:722) 
+0

設定した内容をすべて確認できるように、すべてのブローカ作成コードを含める必要があります。 broker.setUseJmx(false)を使用してJMXを必要としない場合は、JMXを無効にすることができます。 –

+0

明確にするため、表示されるメッセージのレベルはDEBUGです。これは必ずしもエラーではありません。それはちょうど参考になるかもしれません。実際にメッセージを生成/消費する際にエラーが発生していますか?この点についての質問は明確ではありません。 – cmonkey

+0

私は質問を完全に言い直しました。基本的に私は3つのサブ質問をしています。 (1)例外、(2)失われたメッセージ、(3)持続性。私の質問を見てくれてありがとう。 –

答えて

11

あなたはhere文書化されているその多くは多くの方法であなたのコードにブローカーを埋め込むことができます。新しいバージョンのKahaDBストアではなく、現在廃止予定のAMQストアをデフォルトにしているため、使用しているバージョンがかなり古いように見えるので、バージョンをアップグレードしてみてください。 VMブローカで作成しようとする可能性のある別の接続ファクトリを使用するため、クライアントスレッド間の競争のために問題が発生する可能性があります。プロデューサでcreate = falseオプションを設定して、問題が解決した後でコンシューマスレッドが起動していることを確認した場合、またはVMブローカを事前に作成して、スレッドとスレッドにadd create = falseを設定すると、

BrokerService broker = new BrokerService(); 
// configure the broker 
broker.setBrokerName("localhost"); 
broker.setUseJmx(false); 
broker.start(); 

クライアントコードでは、この接続ファクトリの設定で接続します。

私はあなたのコードを実行すると、私は以下の例外だ
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); 
+0

ありがとう、私はそれを持っています。しかし、私はまだ上記の例外について心配しています。それが安全に無視できるかどうか知っていますか? –

+1

クライアントがまだ接続されている間にブローカをシャットダウンしたため、クライアント接続が失敗したという例外があります。アプリをシャットダウンしている場合は大きな問題ではありません。あなたのクライアントは、接続ファクトリでVMの代わりにTCPを使用しているようですが、質問から完全にはっきりとは分かりません。 –

+0

はい...それは確かに問題であり、例外は安全に無視できます。これを回避するには、クライアント(ブローカを含む)の前にクライアントをシャットダウンする必要があります。 –

4

:あなたのブローカーが実行され、ポート61616に耳を傾け、そのブローカーに接続しようとするすべてのクライアントがポートを持っている必要がありますされ

javax.jms.JMSException: Could not connect to broker URL: tcp://localhost. 
Reason java.lang.IllegalArgumentException: port out of range:-1 

をそのURLに

クライアントコードはlocalhostに接続しようとしますが、接続先のポートについては言及していません。 プロデューサコードとコンシューマコードの両方を修正する必要があります。

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); 

ポートを固定した後、私はあなたのコードを実行することができました。

+0

ありがとうSatish。私はこれを得ることができました。謝罪、私は質問を閉めておくべきだった。 –

関連する問題