2013-07-15 23 views
11

basic.consumeを使用してキューに登録している単純なパブリッシャとコンシューマを作成しました。RabbitMqからのメッセージの受信を確認しない

私の消費者は、ジョブが例外なく実行されたときにメッセージを確認します。私が例外に遭遇するたびに、私はメッセージを確認せずに早く帰ります。確認応答されたメッセージだけがキューから消え、正常に動作します。
ここで、失敗したメッセージを再度消費者に送ってもらいたいのですが、そのメッセージを再確認する唯一の方法は、コンシューマを再起動することです。

このユースケースにどのようにアプローチする必要がありますか?

セットアップコード

$channel = new AMQPChannel($connection); 

$exchange = new AMQPExchange($channel); 

$exchange->setName('my-exchange'); 
$exchange->setType('fanout'); 
$exchange->declare(); 

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declare(); 
$queue->bind('my-exchange'); 

コンシューマーコード

$queue->consume(array($this, 'callback')); 

public function callback(AMQPEnvelope $msg) 
{ 
    try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return; 
    } 
    return $queue->ack($msg->getDeliveryTag()); 
} 

プロデューサーコード

$exchange->publish('message'); 
+0

どの言語を使用していますか、いくつかのコードを提供できますか? – pinepain

+0

@ zaq178miami、私の編集したメッセージを参照してください –

+0

@Bram_Gerritsen、私の答えの更新を参照してください – pinepain

答えて

15

メッセージワットの場合承認されず、アプリケーションが失敗すると自動的に再配信され、のプロパティはtrueに設定されます(no-ack = trueフラグで消費しない限り)。

UPD:再配信回数がRabbitMQの中やAMQPプロトコルで実装されていない間、あなたはあなたのcatchブロックで再配信フラグをnackメッセージ

try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); 
    } 

に持って

無限NACKされたメッセージを注意してくださいまったく。

あなたは、このようなメッセージを台無しにしたいと、単にあなたがnackメソッド呼び出しの前に、いくつかのsleep()またはusleep()を追加することもでき、いくつかの遅延を追加したいのですが、それがすべてでは良い考えではありませんしていない場合。

サイクルの再配信の問題に対処するための複数の技術がある:信頼性の高い、標準、明確な

  • 短所:

    1. Dead Letter Exchanges

    • プロにお任せを
    追加のロジックが必要

    2。per message or per queue TTL

    • 賛否を使用します。簡単に、標準も、明確な
    • 短所を実装する:長い行列であなたには、いくつかのメッセージ

    例(ノートを失うことがあり、それはキューTTLのために、我々は唯一の番号を渡しますそして、メッセージのTTLのために - 何だろう数値文字列):メッセージのTTLパー

    2.1:

    $queue = new AMQPQueue($channel); 
    $queue->setName('my-queue'); 
    $queue->declareQueue(); 
    $queue->bind('my-exchange'); 
    
    $exchange->publish(
        'message at ' . microtime(true), 
        null, 
        AMQP_NOPARAM, 
        array(
         'expiration' => '1000' 
        ) 
    ); 
    

    2.2。

    $queue = new AMQPQueue($channel); 
    $queue->setName('my-queue'); 
    $queue->setArgument('x-message-ttl', 1000); 
    $queue->declareQueue(); 
    $queue->bind('my-exchange'); 
    
    $exchange->publish('message at ' . microtime(true)); 
    

    3.ホールド再配信は、メッセージ本文に再配信数を(別名IPスタックの制限またはTTLをホップ)カウントまたは左またはヘッダ

    • 長所:キューTTLあたりのあなたに余分な制御を与えますメッセージの有効期限:アプリケーションレベル
    • 短所:メッセージを修正して再度公開する必要がある間に、アプリケーション固有の、明確ではない

    コード:

    $queue = new AMQPQueue($channel); 
    $queue->setName('my-queue'); 
    $queue->declareQueue(); 
    $queue->bind('my-exchange'); 
    
    $exchange->publish(
        'message at ' . microtime(true), 
        null, 
        AMQP_NOPARAM, 
        array(
         'headers' => array(
          'ttl' => 100 
         ) 
        ) 
    ); 
    
    $queue->consume(
        function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { 
         $headers = $msg->getHeaders(); 
         echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; 
         echo $msg->getDeliveryTag(), ' '; 
         echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; 
         echo $msg->getBody(), PHP_EOL; 
    
         try { 
          //Do some business logic 
          throw new Exception('business logic failed'); 
         } catch (Exception $ex) { 
          //Log exception 
          if (isset($headers['ttl'])) { 
           // with ttl logic 
    
           if ($headers['ttl'] > 0) { 
            $headers['ttl']--; 
    
            $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); 
           } 
    
           return $queue->ack($msg->getDeliveryTag()); 
          } else { 
           // without ttl logic 
           return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue 
          } 
    
         } 
    
         return $queue->ack($msg->getDeliveryTag()); 
        } 
    ); 
    

    が流れを再配信優れた制御メッセージにはいくつかの他の方法かもしれあります。

    結論:銀の弾丸の溶液はありません。 )

  • +0

    あなたの答えをありがとう。 'redelivered'は本当に' true'に設定されていますが、ブロックしている消費者を再起動してメッセージを再調査する必要があります。 –

    +0

    ありがとう、これは私が必要としていたものです。無限に再配信されたメッセージを防ぐ方法をいくつか教えてください。待ち行列へのキューイングを秒単位で遅らせることができればいいと思うので、消費するサーバーに負荷がかかりません。 –

    +0

    ここに行くと、もう一度答えが更新されました – pinepain

    0

    コンシューマを再起動したくない場合は、basic.recover AMQPコマンドがあなたのものである可能性があります。あなたはあなたのニーズに合った解決方法を選択する必要があります。欲しいです。 AMQP protocol

    basic.recover(bit requeue) 
    
    Redeliver unacknowledged messages. 
    
    This method asks the server to redeliver all unacknowledged messages on a specified channel. 
    Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
    
    +0

    このメソッドは、私が使用しているクライアントAPIの一部ではないようです。 http://www.php.net/manual/en/book.amqp.php –

    +1

    RabbitMQはこの方法の一部をサポートしています。[official doc on it](https://www.rabbitmq.com/specification.html# method-status-basic.recover) – pinepain