2017-10-09 11 views
1

が解決取得しています:RabbitMQに直接返信してください。私はAlreadyClosedException

channel.basicPublishを移動します( ""、QUEUE、小道具、message.getBytes());

channel.basicConsume以下

(REPLYQUEUE、...)問題を修正


私はRabbitMQダイレクト返信機能を使用する方法を理解しようとしています。 documentationは実装方法がかなり曖昧なので、RPCの例を使用して、直接reply-toを使用するようにしました。

private final static String QUEUE = "Test_chan"; 
private void directReplyToClient(ConnectionFactory factory) { 
    Connection connection = null; 
    Channel channel = null; 
    String replyQueue; 

    try { 
     connection = factory.newConnection(); 
     channel = connection.createChannel(); 

     //replyQueue = channel.queueDeclare().getQueue(); 
     replyQueue = "amq.rabbitmq.reply-to"; 
     AMQP.BasicProperties props = new AMQP.BasicProperties 
       .Builder() 
       .replyTo(replyQueue) 
       .build(); 
     String message = "Hello World"; 
     channel.basicPublish("", QUEUE, props, message.getBytes()); 

     final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); 

     channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) { 
      @Override 
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { 

       response.offer(new String(body, "UTF-8")); 

      } 
     }); 

     System.out.println(response.take()); 

    } catch (IOException | TimeoutException | InterruptedException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      if (channel != null) 
       channel.close(); 
      if (connection != null) 
       connection.close(); 
     } catch (IOException | TimeoutException _ignore) {} 
    } 
} 

channel.queueDeclare()。getQueue()

作品への返信アドレスを設定しますが、

に設定しますamq.rabbitmq.reply- 〜

スレッド「メイン」com.rabbitmq.client.AlreadyClosedExceptionで

例外:

は、次の例外を与える チャンネルはすでに、チャネルエラーのために閉じられています。プロトコル方式: 法(返信先コード= 406、返信テキストを= PRECONDITION_FAILED - 高速応答、消費者が存在しない場合、クラスID = 60、方法-ID = 40)

私はどこ誰が見るん何か間違っている?すべてのポインタが評価されます。

+0

例外が私の下にあるチャネルを閉じることを試みていることが分かりました。だから私は既に閉鎖されたチャンネルを閉鎖しようとしている。なぜそれは既に閉鎖されていますか? – Orjanp

+1

私は突然それを理解しました。 basicPublic呼び出しをbasicConsume呼び出しの下に移動するだけで、それは魅力的に機能します。 – Orjanp

答えて

1

これはソリューションのコードです。出版の前に消費してください。

private final static String QUEUE = "Test_chan"; 

private void directReplyToProducer(ConnectionFactory factory) { 
    Connection connection = null; 
    Channel channel = null; 
    String replyQueue; 

    try { 
     connection = factory.newConnection(); 
     channel = connection.createChannel(); 

     replyQueue = "amq.rabbitmq.reply-to"; 
     AMQP.BasicProperties props = new AMQP.BasicProperties 
       .Builder() 
       .replyTo(replyQueue) 
       .build(); 
     String message = "Hello World"; 

     final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); 
     System.out.println(" [x] Sent x'" + message + "'"); 

     channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) { 
      @Override 
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { 
       response.offer(new String(body, "UTF-8")); 
      } 
     }); 
     channel.basicPublish("", QUEUE, props, message.getBytes()); 

     System.out.println(response.take()); 
     Thread.sleep(10000); 

    } catch (IOException | TimeoutException | InterruptedException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      if (channel != null) 
       channel.close(); 
      if (connection != null) 
       connection.close(); 
     } catch (IOException | TimeoutException _ignore) {} 
    } 
} 
関連する問題