2016-10-19 6 views
1

ActiveMQを使用してアプリケーションを作成すると、iamは非同期onMessage()メソッドを使用してActiveMQからメッセージを取得します。 私はすべてのメッセージがOnMessage()メソッドのConcurrentLinkedQueueに格納されるように、activemqから1000のメッセージを取得し、スレッドを使用してConcurrentLinkedQueueから取得すると仮定します。 しかし、私が直面している問題は、iamがConcurrentLinkedQueueとの間で1つのメッセージを追加したり取得したりすることができず、onMessage()のtextMessageがsetterメソッドに送信され、textMessageを受け取りますが、これを避けるには?私はいけないActiveMQ OnMessage()メソッドは他のスレッドをブロックします

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a producer 
     Thread producer = new Thread(new Producer(queue,settext)); 
     producer.start(); 
//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       //Setting the text message to a setter which takes TextMessage as arg 
       settext.setTextmessage((TextMessage) message); 
      } 
     } 
    } 

//Problem here unable to produce 
class Producer implements Runnable { 

    ConcurrentLinkedQueue<TextMessage> queue; 
    Settext settext; 
    Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){ 
     this.queue = queue2; 
     this.settext=settext; 
    } 

    public void run() { 
     System.out.println("Producer Started"); 
     try { 
      if(this.settext.getTextmessage()!=null) 
      { 
       //Add to ConcurrentLinkedQueue 
       queue.add(this.settext.getTextmessage()); 
      } 
      Thread.currentThread().sleep(200); 
      //} 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while ((str = queue.poll()) != null) { 
      System.out.println("Removed: " + str); 

     } 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     //} 
    } 

答えて

2

以下のよう

コードスニペットはQueueMessageListenerが非同期に実行されることに注意してください、あなたがこれを行うが、あなたのデザインの問題がある理由を、以下の注意事項1 - 5を参照してください知っているし、それは変更することができていますsettext.setTextmessage((TextMessage) message);別の消費者はのTextMessageを取得し、これは多分V2は良いですが、多分org.springframework.jms.listener.DefaultMessageListenerContainerが最善の解決策で使用するために、キューに追加する前に:

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a producer 
// 1- settext.getTextmessage() == null i suppose at this level, see 2- point 
     Thread producer = new Thread(new Producer(queue,settext)); 
     producer.start(); 
//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     // 3- you start consumers go to 4, note that you will only consume count messages !! 
     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       //Setting the text message to a setter which takes TextMessage as arg 
       settext.setTextmessage((TextMessage) message); 
       // at this point message is considered as delivered if sessionAcknowledgeModeName is AUTO_ACKNOWLEDGE and maybe lost if asynchronous treatment fails 
      } 
     } 
    } 

//Problem here unable to produce 
class Producer implements Runnable { 

    ConcurrentLinkedQueue<TextMessage> queue; 
    Settext settext; 
    Producer(ConcurrentLinkedQueue<TextMessage> queue2, Settext settext){ 
     this.queue = queue2; 
     this.settext=settext; 
    } 

    public void run() { 
     System.out.println("Producer Started"); 
     try { 
     // 2- settext.getTextmessage() == null if block is not executed and thread will sleep and finish 
    // you have to add this   
      while (this.settext.getTextmessage() == null) { 
      try { 
       Thread.currentThread().sleep(500); 
      } catch (Exception ex) { 
       ex.printStackTrace(); 
      } 
      } 
      if(this.settext.getTextmessage()!=null) 
      { 
       //Add to ConcurrentLinkedQueue 
       queue.add(this.settext.getTextmessage()); 
      } 
      //} 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     // 4- queue.poll() == null at this level, while loop finished, thread will sleep and finish 
    // you have to add this 
     while ((str = queue.poll()) == null) { 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     } 
     System.out.println("Removed: " + str); 
     //} 
    } 

V2:

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     for (int i = 0; i <count; i++) { 
      executor.execute(new Consumer(queue)); 
     } 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       queue.add((TextMessage) message); 
      } 
     } 
    } 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while ((str = queue.poll()) == null) { 
     try { 
      Thread.currentThread().sleep(500); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
     } 
     System.out.println("Removed: " + str); 
     //} 
    } 

V3:

public static void main(String[] args) throws InterruptedException, JMSException { 

//Create a Consumer with coresize 4 and Max size 10 
     final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 10, 100, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); 
     executor.allowCoreThreadTimeOut(true); 

     **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

     consumer.setMessageListener(new QueueMessageListener()); 
     executor.shutdown(); 
    } 

private static class QueueMessageListener implements MessageListener { 

      @Override 
      public void onMessage(Message message) { 
       executor.execute(new Consumer((TextMessage) message)); 
      } 
     } 
    } 

//Problem here unable to consume 
class Consumer implements Runnable { 
    TextMessage textMessage; 

    public Consumer(TextMessage textMessage) { 
     this.textMessage = textMessage; 
    } 
    public void run() { 
     System.out.println("Removed: " + str); 
    } 
} 

V4:ActiveMQのproducer.I 100からメッセージを送信

public static void main(String[] args) throws InterruptedException, JMSException { 

    new Consumer(queue).start(); 

    **//INITIALIZE ACTIVEMQ CONFIGURATION HERE** 

    consumer.setMessageListener(new QueueMessageListener()); 
    executor.shutdown(); 
} 

private static class QueueMessageListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     queue.add((TextMessage) message); 
    } 
} 

//Problem here unable to consume 
class Consumer implements Runnable { 
    ConcurrentLinkedQueue<TextMessage> queue; 

    public Consumer(ConcurrentLinkedQueue<TextMessage> queue2) { 
     this.queue = queue2; 
    } 
    public void run() { 
     TextMessage str; 
     System.out.println("Consumer Started"); 
     while (true) { 
      try { 
       Thread.currentThread().sleep(500); 
      } catch (Exception ex) { 
      } 
      while ((str = queue.poll()) == null) { 
       try { 
        Thread.currentThread().sleep(500); 
       } catch (Exception ex) { 
       } 
      } 
      System.out.println("Removed: " + str); 
     } 
    } 
} 
+0

IAMがコードのV2を試みたが、私は( "削除" するSystem.out.println + str); 10回しか印刷しません。つまり、キューから10個の要素しか削除されません。iamが間違っていると私を訂正します。 –

+0

これは、変数のカウント== 10を意味しますか? –

+0

多分V3はより柔軟です –

関連する問題