いくつかのワイルドカードサブスクリプション(例:/ A /#および/ B /#)を実行しています。各サブスクリプション(下記createSubscriber(topic)
を参照)は約1000トピックになり、約10秒で戻ることができます。妥当な応答時間は10秒ですか?私にとっては遅いようですが、私はそれを比較することはありません。MQトピックのサブスクリプションが遅い。並列化がパフォーマンスを向上させない
以下のコードを与えます。
public class JMSClientSubscriber implements Runnable {
TopicConnection topicCon;
Properties properties;
List<MyListener> listeners;
JmsTopicConnectionFactory jcf;
boolean connected, alive;
public JMSClientSubscriber() throws JMSException {
properties = Properties.getInstance();
listeners = new LinkedList<>();
jcf = FLOWConnectionFactory.getTopicFactory(properties, Location.CLIENT);
connected = false;
alive = true;
}
@Override
public void run() {
try {
connect();
while (alive) {
Thread.sleep(1000);
}
disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
void connect() throws Exception {
connected = false;
topicCon = jcf.createTopicConnection();
topicCon.setExceptionListener(new ExceptionListener() {
@Override public void onException(JMSException arg0) {
disconnect();
try {
Thread.sleep(1000);
connect();
} catch (Exception e) {
e.printStackTrace();
}
}
});
topicCon.start();
for (MyListener listener: listeners) {
Thread t = new Thread() {
@Override public void run() {
TopicSession topicSes;
try {
topicSes = topicCon.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Topic topic = topicSes.createTopic(listener.exampleMessage.getTopicSubscription());
System.out.println(new Date() + " Subscribing to " + topic);
/* THIS TAKES 10 SECONDS! */ TopicSubscriber topicSub = topicSes.createSubscriber(topic);
System.out.println(new Date() + " Subscription finished " + topic);
topicSub.setMessageListener(listener);
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
}
connected = true;
}
void disconnect() {
try {
connected = false;
if (topicCon != null) topicCon.close();
} catch (JMSException e) {}
}
public void stop() { alive = false; }
public class MyListener implements MessageListener {
Class<? extends FlowMessage> expectedClass;
FlowMessage exampleMessage;
public MyListener(Class<? extends FlowMessage> expectedClass) throws Exception {
this.expectedClass = expectedClass;
exampleMessage = expectedClass.newInstance();
listeners.add(this);
}
@Override
public void onMessage(javax.jms.Message arg0) {
BytesMessage bm = (BytesMessage) arg0;
try {
byte bytes[] = new byte[(int) bm.getBodyLength()];
bm.readBytes(bytes);
FlowMessage flowMessage = exampleMessage.newInstance(bytes);
System.out.println(new Date() + "[" + bm.getJMSDestination() + "] " + flowMessage.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Properties properties = Properties.newInstance(new File("D:\\cc_views\\D253570_ALL_FLOW_DEV\\DealingRoom\\FLOW\\src\\cfg\\flow.properties"));
LogManager.getLogManager().readConfiguration(new FileInputStream(properties.getPropertyAsFile("logging.properties")));
/* Thread per connection */
for (Class<FlowMessage> clazz: new Class[] { KondorCpty.class, KondorPair.class }) {
JMSClientSubscriber s = new JMSClientSubscriber();
s.new MyListener(clazz);
new Thread(s).start();
}
/* Thread per session */
JMSClientSubscriber s = new JMSClientSubscriber();
s.new MyListener(KondorCpty.class);
s.new MyListener(KondorPair.class);
new Thread(s).start();
}
}このコードでmain
は、2つのテストを実行します。
つの接続+マルチスレッド/セッション
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:19:00 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:19:07 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:19:08 2016[topic://DRS/OW/Pair/RONGBP] KondorPair
マルチスレッド・コネクション+スレッド/接続
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:22:52 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:23:00 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:23:00 2016[topic://DRS/OW/Pair/RONGBP] KondorPair
ごとに1つのセッションの両方のテストは賢明同じタイミングワイズと行動です。
- 〜1000年のトピックが10秒
- サブスクリプション〜とるためのサブスクリプションは、彼らが異なるスレッドにあるにもかかわらず、シーケンシャルに実行するように見えます。
- トピックの更新は、すべてのサブスクリプションが完了した後にのみ表示されます。
- サブスクリプションの前または後にTopicConnection.start()を使用すると、パフォーマンスが変わらず、トピックの最初の更新が到着したときに違いがありません。
どのように私はこれをスピードアップしますか?
こんにちは、お返事いただきありがとうございます。私は完全なコードを含んでいます。 – lafual
私の最初の提案は、サブスクライバが作成され、リスナがセットアップされた後にtopicCon.start()を移動することです。これにより、connection.start()が呼び出されたときにリスナーがメッセージを受信できる状態になっていることを確認し、メッセージングプロバイダにメッセージの配信を開始するように指示します。 – Shashi
私はこれを試しました。変化なし。 (a)各サブスクリプションには10秒間のリターンが必要であり、(b)最初のトピックの更新は最終サブスクリプション完了後にのみ受信されます。 (参考 - JavaCl8でMQClient 7.5のJARを使用しています)。 – lafual