EDIT:質問の言い換え:ActiveMQと組み込みブローカー
ActiveMQをサーバーとクライアントアプリケーション間のメッセンジャーサービスとして使用したいと考えています。
私のクライアントが消費するために生成されたメッセージを処理するために、サーバー内に組み込みブローカー(つまり別のプロセスではない)を設定しようとしています。このキューは永続化されます。
ブローカーの初期化を次のように
BrokerService broker = new BrokerService();
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter();
adaptor.setDirectory(new File("activemq"));
broker.setPersistenceAdapter(adaptor);
broker.setUseJmx(true);
broker.addConnector("tcp://localhost:61616");
broker.start();
をいじった後、私は、サーバーの一部のビーイングになってしまった:
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost"); // apparently the vm part is all i need
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
クライアントが非常に似ており、次のようになります。
メインメソッドは、スレッド内でこれらのそれぞれを開始するだけで、メッセージの生成/受信を開始しますges。
...しかし、私は、各スレッドの開始と次のように実行しています:
2013-01-24 07:54:31,271 INFO [org.apache.activemq.broker.BrokerService] Using Persistence Adapter: AMQPersistenceAdapter(activemq-data/localhost)
2013-01-24 07:54:31,281 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] AMQStore starting using directory: activemq-data/localhost
2013-01-24 07:54:31,302 INFO [org.apache.activemq.kaha.impl.KahaStore] Kaha Store using data directory activemq-data/localhost/kr-store/state
2013-01-24 07:54:31,339 INFO [org.apache.activemq.store.amq.AMQPersistenceAdapter] Active data files: []
2013-01-24 07:54:31,445 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Probably not using JRE 1.4: mx4j.tools.naming.NamingService
2013-01-24 07:54:31,450 DEBUG [org.apache.activemq.broker.jmx.ManagementContext] Failed to create local registry
java.rmi.server.ExportException: internal error: ObjID already in use
at sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:186)
at sun.rmi.transport.Transport.exportObject(Transport.java:92)
at sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:247)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
<snip....>
メッセージが生成されているように思え、正常に消費する(私は以前について投稿し、他の問題が解決されました)、上記の例外は私を心配しています。
EDIT:ブローカーのシャットダウン時には、私は今も、以下に迎えています:
2013-01-25 08:40:17,486 DEBUG [org.apache.activemq.transport.failover.FailoverTransport] Transport failed with the following exception:
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
at java.lang.Thread.run(Thread.java:722)
設定した内容をすべて確認できるように、すべてのブローカ作成コードを含める必要があります。 broker.setUseJmx(false)を使用してJMXを必要としない場合は、JMXを無効にすることができます。 –
明確にするため、表示されるメッセージのレベルはDEBUGです。これは必ずしもエラーではありません。それはちょうど参考になるかもしれません。実際にメッセージを生成/消費する際にエラーが発生していますか?この点についての質問は明確ではありません。 – cmonkey
私は質問を完全に言い直しました。基本的に私は3つのサブ質問をしています。 (1)例外、(2)失われたメッセージ、(3)持続性。私の質問を見てくれてありがとう。 –