2016-09-12 10 views
0

いくつかのワイルドカードサブスクリプション(例:/ A /#および/ B /#)を実行しています。各サブスクリプション(下記createSubscriber(topic)を参照)は約1000トピックになり、約10秒で戻ることができます。妥当な応答時間は10秒ですか?私にとっては遅いようですが、私はそれを比較することはありません。MQトピックのサブスクリプションが遅い。並列化がパフォーマンスを向上させない

以下のコードを与えます。

public class JMSClientSubscriber implements Runnable { 

    TopicConnection   topicCon; 
    Properties    properties; 
    List<MyListener>   listeners; 
    JmsTopicConnectionFactory jcf; 
    boolean     connected, alive; 

    public JMSClientSubscriber() throws JMSException { 
      properties = Properties.getInstance(); 
      listeners = new LinkedList<>(); 
      jcf = FLOWConnectionFactory.getTopicFactory(properties, Location.CLIENT); 
      connected = false; 
      alive = true; 
    } 

    @Override 
    public void run() { 
      try { 
        connect(); 
        while (alive) { 
          Thread.sleep(1000); 
        } 
        disconnect(); 
      } catch (Exception e) { 
        e.printStackTrace(); 
      } 
    } 

    void connect() throws Exception { 
      connected = false; 
      topicCon = jcf.createTopicConnection(); 

      topicCon.setExceptionListener(new ExceptionListener() { 
        @Override public void onException(JMSException arg0) { 
          disconnect(); 
          try { 
            Thread.sleep(1000); 
            connect(); 
          } catch (Exception e) { 
            e.printStackTrace(); 
          } 
        } 
      }); 

      topicCon.start(); 

      for (MyListener listener: listeners) { 
        Thread t = new Thread() { 
          @Override public void run() { 
            TopicSession topicSes; 
            try { 
              topicSes = topicCon.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
              Topic topic = topicSes.createTopic(listener.exampleMessage.getTopicSubscription()); 
              System.out.println(new Date() + " Subscribing to " + topic); 
    /* THIS TAKES 10 SECONDS! */   TopicSubscriber topicSub = topicSes.createSubscriber(topic); 
              System.out.println(new Date() + " Subscription finished " + topic); 
              topicSub.setMessageListener(listener); 
            } catch (Exception e) { 
              e.printStackTrace(); 
            } 
          } 
        }; 
        t.start(); 
      } 
      connected = true; 
    } 

    void disconnect() { 
      try { 
        connected = false; 
        if (topicCon != null) topicCon.close(); 
      } catch (JMSException e) {}  
    } 

    public void stop() { alive = false; } 

    public class MyListener implements MessageListener {   
      Class<? extends FlowMessage>  expectedClass; 
      FlowMessage      exampleMessage; 

      public MyListener(Class<? extends FlowMessage> expectedClass) throws Exception { 
        this.expectedClass = expectedClass; 
        exampleMessage = expectedClass.newInstance(); 
        listeners.add(this); 
      } 

      @Override 
      public void onMessage(javax.jms.Message arg0) { 
        BytesMessage bm = (BytesMessage) arg0; 

        try { 
          byte bytes[] = new byte[(int) bm.getBodyLength()]; 
          bm.readBytes(bytes); 
          FlowMessage flowMessage = exampleMessage.newInstance(bytes); 
          System.out.println(new Date() + "[" + bm.getJMSDestination() + "] " + flowMessage.toString()); 

        } catch (Exception e) { 
          e.printStackTrace(); 
        } 
      } 
    } 


    public static void main(String[] args) throws Exception { 
      Properties properties = Properties.newInstance(new File("D:\\cc_views\\D253570_ALL_FLOW_DEV\\DealingRoom\\FLOW\\src\\cfg\\flow.properties")); 
      LogManager.getLogManager().readConfiguration(new FileInputStream(properties.getPropertyAsFile("logging.properties"))); 

      /* Thread per connection */ 
      for (Class<FlowMessage> clazz: new Class[] { KondorCpty.class, KondorPair.class }) { 
        JMSClientSubscriber s = new JMSClientSubscriber(); 
        s.new MyListener(clazz); 
        new Thread(s).start(); 
      } 

      /* Thread per session */ 
      JMSClientSubscriber s = new JMSClientSubscriber(); 
      s.new MyListener(KondorCpty.class); 
      s.new MyListener(KondorPair.class); 
      new Thread(s).start(); 

    } 

}このコードでmain

は、2つのテストを実行します。

つの接続+マルチスレッド/セッション

Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Cpty/# 
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Pair/# 
Tue Sep 13 10:19:00 2016 Subscription finished topic://DRS/OW/Cpty/# 
Tue Sep 13 10:19:07 2016 Subscription finished topic://DRS/OW/Pair/# 
Tue Sep 13 10:19:08 2016[topic://DRS/OW/Pair/RONGBP] KondorPair 

マルチスレッド・コネクション+スレッド/接続

Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Pair/# 
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Cpty/# 
Tue Sep 13 10:22:52 2016 Subscription finished topic://DRS/OW/Cpty/# 
Tue Sep 13 10:23:00 2016 Subscription finished topic://DRS/OW/Pair/# 
Tue Sep 13 10:23:00 2016[topic://DRS/OW/Pair/RONGBP] KondorPair 

ごとに1つのセッションの両方のテストは賢明同じタイミングワイズと行動です。

  • 〜1000年のトピックが10秒
  • サブスクリプション〜とるためのサブスクリプションは、彼らが異なるスレッドにあるにもかかわらず、シーケンシャルに実行するように見えます。
  • トピックの更新は、すべてのサブスクリプションが完了した後にのみ表示されます。
  • サブスクリプションの前または後にTopicConnection.start()を使用すると、パフォーマンスが変わらず、トピックの最初の更新が到着したときに違いがありません。

どのように私はこれをスピードアップしますか?

答えて

0

次の点に注意してください。

1)ごとのCreateSession(キューまたはトピックについて)メソッド呼び出し、データはJMSセッションの環境を設定するための両方のクライアントとキューマネージャーから流れ。あなたは、ネットワーク上のデータフローが関与していることを意味するリモート接続を使用しています。

2)createSubscriberメソッドの呼び出しには、サブスクリプションオブジェクトの作成、トピックルックアップとは別の一時的なキュー、キューマネージャの終了時の権限の検証などが含まれます。

どのように接続/セッションを並列化しているかを教えてください。

JMS仕様では、セッション間でセッションを共有しないでください。私はスレッド

1)JMS接続

2を作成し、各加入者)を作成するためのJMSセッション

3)1つのスレッド専用になること)加入者

4を作成JMS Connection.startはい()を使用してメッセージ配信を開始します。

+0

こんにちは、お返事いただきありがとうございます。私は完全なコードを含んでいます。 – lafual

+0

私の最初の提案は、サブスクライバが作成され、リスナがセットアップされた後にtopicCon.start()を移動することです。これにより、connection.start()が呼び出されたときにリスナーがメッセージを受信できる状態になっていることを確認し、メッセージングプロバイダにメッセージの配信を開始するように指示します。 – Shashi

+0

私はこれを試しました。変化なし。 (a)各サブスクリプションには10秒間のリターンが必要であり、(b)最初のトピックの更新は最終サブスクリプション完了後にのみ受信されます。 (参考 - JavaCl8でMQClient 7.5のJARを使用しています)。 – lafual

0

問題はonMessageにありました。ここでメッセージを処理する代わりに、私はBlockingQueueにメッセージを置きます。数多くの個別のスレッドがこれをポーリングしてBlockingQueueとなりました。これにより、MessageListenerのスループットが大幅に改善され、マルチスレッドの問題がJMS/MQコードから離されました。

関連する問題