2016-06-29 16 views
6

私はセロリーの新人です。このタスクキューをプロジェクトに統合しようとしましたが、セカリが失敗したタスクをどのように処理するのかまだ分かりません私はそれらをすべてamqpデッドレターのキューに入れておきたいと思います。Celery:失敗したタスクをデッドレターキューにルーティングするにはどうすればいいですか

文書hereによると、acks_lateを有効にしたタスクで拒否を呼び出すと、メッセージを確認するのと同じ効果が得られ、デッドレターキューについていくつかの言葉があります。

だから、私はセロリの設定

celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'], 
         CELERY_TASK_SERIALIZER='json', 
         CELERY_QUEUES=[CELERY_QUEUE, 
             CELERY_DLX_QUEUE], 
         CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME, 
         CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE 
         ) 

にカスタムデフォルトのキューを追加し、私の昆布オブジェクトが

CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct') 
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE, 
          routing_key='celery-dlq') 

DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME, 
          'x-dead-letter-routing-key': 'celery-dlq'} 

CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME, 
           arguments=DEAD_LETTER_CELERY_OPTIONS, 
           type='direct') 

CELERY_QUEUE = Queue(CELERY_QUEUE_NAME, 
         exchange=CELERY_EXCHANGE, 
         routing_key='celery-q') 

のように見ていると私は実行していたタスクは次のとおりです。

class HookTask(Task): 
    acks_late = True 

def run(self, ctx, data): 
    logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self)) 
    self.hook_process(ctx, data) 


def on_failure(self, exc, task_id, args, kwargs, einfo): 
    logger.error('task_id %s failed, message: %s', task_id, exc.message) 

def hook_process(self, t_ctx, body): 
    # Build context 
    ctx = TaskContext(self.request, t_ctx) 
    logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id) 
    raise Reject('no_reason', requeue=False) 

私はそれを少しテストしましたが、拒否例外を発生させても結果はありません。

ここで、Task.on_failureを無効にすることによって、失敗したタスクルートをデッドレターキューに強制することをお勧めしますか?私はこれがうまくいくと思いますが、私は赤いセロリがこれだけで何をするべきかに応じて、この解決策はとても清潔ではないとも考えています。

ありがとうございました。

+1

誰も答えなかったことを悲しんでいる。あなたはこれに対する解決策を見つけましたか? @onizukaek –

答えて

1

CELERY_EXCHANGEにarguments=DEAD_LETTER_CELERY_OPTIONSを追加しないでください。 CELERY_QUEUEにqueue_arguments=DEAD_LETTER_CELERY_OPTIONSで追加する必要があります。

次の例は、私がやったことであり、それは正常に動作します:

from celery import Celery 
from kombu import Exchange, Queue 
from celery.exceptions import Reject 

app = Celery(
    'tasks', 
    broker='amqp://[email protected]:5672//', 
    backend='redis://localhost:6379/0') 

dead_letter_queue_option = { 
    'x-dead-letter-exchange': 'dlx', 
    'x-dead-letter-routing-key': 'dead_letter' 
} 

default_exchange = Exchange('default', type='direct') 
dlx_exchange = Exchange('dlx', type='direct') 

default_queue = Queue(
    'default', 
    default_exchange, 
    routing_key='default', 
    queue_arguments=dead_letter_queue_option) 
dead_letter_queue = Queue(
    'dead_letter', dlx_exchange, routing_key='dead_letter') 

app.conf.task_queues = (default_queue, dead_letter_queue) 

app.conf.task_default_queue = 'default' 
app.conf.task_default_exchange = 'default' 
app.conf.task_default_routing_key = 'default' 


@app.task 
def add(x, y): 
    return x + y 


@app.task(acks_late=True) 
def div(x, y): 
    try: 
     z = x/y 
     return z 
    except ZeroDivisionError as exc: 
     raise Reject(exc, requeue=False) 

キューを作成した後、あなたは「特長」欄にそれを見る必要があり、それはDLX(デッドレター交換)を示し、およびDLK(デッドレタールーティングキー)のラベルです。

enter image description here

注:RabbitMQの中でそれらを既に作成している場合は、前のキューを削除する必要があります。これは、セロリが既存のキューを削除して新しいキューを再作成しないためです。

+0

あなたの例は私のためには機能しません。 'dead_letter_queue'を' task_queues'から削除する必要があります。それ以外の場合、セロリのワーカーはこのキューに接続し、メッセージを処理します(処理なし)。これらのキューを作成するには別の方法が必要です。 –

+0

@ K.P。私も同じ問題に出会った。あなたは絶対に正しいです。あなたのソリューションに感謝します。私はそれをテストし、それは素晴らしい動作します! :-) –

1

私は同様のケースを抱えており、同じ問題に直面しました。私はまた、ハードコードされた値ではなく構成に基づいたソリューションを求めていました。 Hengfeng Liの提案された解決策は非常に有用であり、私がメカニズムと概念を理解するのを助けました。しかし、デッドレター待ち行列の宣言には問題がありました。具体的には、task_default_queuesにDLQを注入した場合、Celeryはキューを消費していて、常に空でした。したがって、DL(X/Q)を宣言する手動の方法が必要でした。

私はCeleryのBootstepsを使用しました。コードが実行された段階で適切なコントロールを提供するためです。私の最初の実験は、アプリケーションの作成後にそれらを正確に作成することでしたが、プロセスのフォーク後に接続が停止し、醜い例外が作成されました。 Poolステップの直後に実行されるブートステップでは、フォークされて接続プールが準備された後、各ワーカーの初めに実行されることが保証されます。

最後に、セロリのRejectを使って未知の例外をタスク拒否に変換するデコレータを作成しました。リトライなど、処理方法がすでに決定されている場合は特に注意が必要です。

完全な実例があります。タスクdiv.delay(1, 0)を実行して、その動作を確認してください。

from celery import Celery 
from celery.exceptions import Reject, TaskPredicate 
from functools import wraps 
from kombu import Exchange, Queue 

from celery import bootsteps 


class Config(object): 

    APP_NAME = 'test' 

    task_default_queue = '%s_celery' % APP_NAME 
    task_default_exchange = "%s_celery" % APP_NAME 
    task_default_exchange_type = 'direct' 
    task_default_routing_key = task_default_queue 
    task_create_missing_queues = False 
    task_acks_late = True 

    # Configuration for DLQ support 
    dead_letter_exchange = '%s_dlx' % APP_NAME 
    dead_letter_exchange_type = 'direct' 
    dead_letter_queue = '%s_dlq' % APP_NAME 
    dead_letter_routing_key = dead_letter_queue 


class DeclareDLXnDLQ(bootsteps.StartStopStep): 
    """ 
    Celery Bootstep to declare the DL exchange and queues before the worker starts 
     processing tasks 
    """ 
    requires = {'celery.worker.components:Pool'} 

    def start(self, worker): 
     app = worker.app 

     # Declare DLX and DLQ 
     dlx = Exchange(
      app.conf.dead_letter_exchange, 
      type=app.conf.dead_letter_exchange_type) 

     dead_letter_queue = Queue(
      app.conf.dead_letter_queue, 
      dlx, 
      routing_key=app.conf.dead_letter_routing_key) 

     with worker.app.pool.acquire() as conn: 
      dead_letter_queue.bind(conn).declare() 


app = Celery('tasks', broker='pyamqp://[email protected]//') 
app.config_from_object(Config) 


# Declare default queues 
# We bypass the default mechanism tha creates queues in order to declare special queue arguments for DLX support 
default_exchange = Exchange(
    app.conf.task_default_exchange, 
    type=app.conf.task_default_exchange_type) 
default_queue = Queue(
     app.conf.task_default_queue, 
     default_exchange, 
     routing_key=app.conf.task_default_routing_key, 
     queue_arguments={ 
      'x-dead-letter-exchange': app.conf.dead_letter_exchange, 
      'x-dead-letter-routing-key': app.conf.dead_letter_routing_key 
     }) 

# Inject the default queue in celery application 
app.conf.task_queues = (default_queue,) 

# Inject extra bootstep that declares DLX and DLQ 
app.steps['worker'].add(DeclareDLXnDLQ) 


def onfailure_reject(requeue=False): 
    """ 
    When a task has failed it will raise a Reject exception so 
    that the message will be requeued or marked for insertation in Dead Letter Exchange 
    """ 

    def _decorator(f): 
     @wraps(f) 
     def _wrapper(*args, **kwargs): 

      try: 
       return f(*args, **kwargs) 
      except TaskPredicate: 
       raise # Do not handle TaskPredicate like Retry or Reject 
      except Exception as e: 
       print("Rejecting") 
       raise Reject(str(e), requeue=requeue) 
     return _wrapper 

    return _decorator 


@app.task() 
@onfailure_reject() 
def div(x, y): 
    return x/y 

編集:私はセロリ4.1.0でいくつかの互換性の問題を発見したとして、セロリ(小文字)の新しい構成スキーマを使用するコードを更新しました。

関連する問題