2017-05-09 12 views
3

RabbitMQ、JavaとPythonの混在コンポーネントでヘッダー交換を使用しようとしています。Rabbitmqヘッダーの交換と確認済みの配送

私はpython(pika)とjavaクライアントとは異なる動作をするようです。 Pythonで

channel.exchange_declare(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers', 
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True) 
channel.confirm_delivery() 
result = channel.basic_publish(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ routing_key='', 
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True, 
¦ ¦ ¦ ¦ ¦ ¦ body=message, 
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2, 
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers)) 

ヘッダは、バインドされた消費者とのメッセージをルーティングすることはできません一致しない場合は、結果は

偽が、Java/Scalaである:

channel.exchangeDeclare("headers_test", "headers", true, false, null) 
channel.confirmSelect 

val props = MessageProperties.PERSISTENT_BASIC.builder 
¦ ¦ ¦ ¦ .headers(messageHeaders).build 
channel.basicPublish("headers_test", 
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey 
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory 
¦ ¦ ¦ ¦ ¦ ¦props, 
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes) 
channel.waitForConfirmsOrDie() 

ここで、メッセージヘッダーで一致が見つからない場合、メッセージはエラーなしで破棄されたようです

何か不足していますか、どちらのクライアントの動作が実際に異なりますか?そして、どのように私はjavaでヘッダー交換を使用して確認配信を得ることができますか?

注:キューのルーティング設定にはすでに「複雑な」交換がありますが、デッドレターのルーティングをゲームに追加することは避けたいだけです。

答えて

1

ヘッダーに一致するキューがなくてもメッセージが確認されたとする問題。ドキュメント(https://www.rabbitmq.com/confirms.html)から: 交換は任意のキュー(待ち行列の空の リストを返します)にメッセージしませんルートを確認したら、ルーティング不能なメッセージについて

、ブローカーは確認を発行します。メッセージが必須として公開されている場合は、basic.ackの前に basic.returnがクライアントに送信されます。否定応答(basic.nack)の場合も同じです( )。

代わりに、メッセージがルーティングされているかどうかを検出するためにbasic.returnメッセージをチェックする必要があります。

私はwiresharkで確認しましたが、実際にはメッセージがルーティングされていないとAMQP basic.returnメッセージが表示されます。

私はメッセージが私はこの取得ルーティングされていない場合は、

channel.addReturnListener(new ReturnListener() { 
    @Override 
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("App.handleReturn"); 
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
    } 
}); 

そして実際に開始する必要がありますsupppose:

replyCode = [312]、replyText = [NO_ROUTE]、交換をあなたはJavaでピカの同期動作をエミュレートしたい場合= [headers_logs]、 routingKey = []、プロ....

さらに、それはあなたがDできるようですそれは、メッセージを発行する前に現在の公開順序番号を取得し、.waitForConfirmsOrDie()に頼るのではなく、確認リスナーを登録することによって行われます。

だから、完全なコードサンプルは、次のようになります。

channel.addReturnListener(new ReturnListener() { 
     @Override 
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     System.out.println("App.handleReturn"); 
     System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
     } 
    }); 

    channel.addConfirmListener(new ConfirmListener() { 
     @Override 
     public void handleAck(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleAck"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 

     @Override 
     public void handleNack(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleNack"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 
}); 

long nextPublishSeqNo = channel.getNextPublishSeqNo(); 
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo); 

channel.basicPublish("headers_logs", 
    "", 
    true, 
    props, 
    "data".getBytes()); 

とリターン/確認コールバックの内側に、あなたがメッセージを公開する前に得たチャネルの公開シーケンス番号を調べる必要があります。

メッセージがどのキューにもルーティングされていない場合、RabbitMqは確認(配信タグ)を含む1つのbasic.returnメッセージを返します。メッセージがルーティングされている場合、RabbitMqはbacic.ackという単一のメッセージを返信します。このメッセージには確認も含まれています。

とてもメッセージがルーティングされたかどうかを決定するためのロジックは、このことができ、)(RabbitMQのJavaクライアントは常にbasicConfirm前basicReturn()コールバックを呼び出すようだ:

登録は上のリスナーを返すと確認チャネル; チャネルの次の公開シーケンス番号を記憶します。 リターンコールバックまたは確認コールバックのいずれかを待ちます。リターンコールバックの場合、メッセージはルーティングされていないので、同じ配信タグの確認は無視してください。 handleReturn()を受け取る前にhandleAck()コールバックを受け取ると、メッセージがキューにルーティングされたことを意味します。

どのような場合でもハンドラック()を呼び出すことはできません。

関連する問題