私はセロリーの新人です。このタスクキューをプロジェクトに統合しようとしましたが、セカリが失敗したタスクをどのように処理するのかまだ分かりません私はそれらをすべてamqpデッドレターのキューに入れておきたいと思います。Celery:失敗したタスクをデッドレターキューにルーティングするにはどうすればいいですか
文書hereによると、acks_lateを有効にしたタスクで拒否を呼び出すと、メッセージを確認するのと同じ効果が得られ、デッドレターキューについていくつかの言葉があります。
だから、私はセロリの設定
celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],
CELERY_TASK_SERIALIZER='json',
CELERY_QUEUES=[CELERY_QUEUE,
CELERY_DLX_QUEUE],
CELERY_DEFAULT_QUEUE=CELERY_QUEUE_NAME,
CELERY_DEFAULT_EXCHANGE=CELERY_EXCHANGE
)
にカスタムデフォルトのキューを追加し、私の昆布オブジェクトが
CELERY_DLX_EXCHANGE = Exchange(CELERY_DLX_EXCHANGE_NAME, type='direct')
CELERY_DLX_QUEUE = Queue(CELERY_DLX_QUEUE_NAME, exchange=DLX_EXCHANGE,
routing_key='celery-dlq')
DEAD_LETTER_CELERY_OPTIONS = {'x-dead-letter-exchange': CELERY_DLX_EXCHANGE_NAME,
'x-dead-letter-routing-key': 'celery-dlq'}
CELERY_EXCHANGE = Exchange(CELERY_EXCHANGE_NAME,
arguments=DEAD_LETTER_CELERY_OPTIONS,
type='direct')
CELERY_QUEUE = Queue(CELERY_QUEUE_NAME,
exchange=CELERY_EXCHANGE,
routing_key='celery-q')
のように見ていると私は実行していたタスクは次のとおりです。
class HookTask(Task):
acks_late = True
def run(self, ctx, data):
logger.info('{0} starting {1.name}[{1.request.id}]'.format(self.__class__.__name__.upper(), self))
self.hook_process(ctx, data)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error('task_id %s failed, message: %s', task_id, exc.message)
def hook_process(self, t_ctx, body):
# Build context
ctx = TaskContext(self.request, t_ctx)
logger.info('Task_id: %s, handling request %s', ctx.task_id, ctx.req_id)
raise Reject('no_reason', requeue=False)
私はそれを少しテストしましたが、拒否例外を発生させても結果はありません。
ここで、Task.on_failureを無効にすることによって、失敗したタスクルートをデッドレターキューに強制することをお勧めしますか?私はこれがうまくいくと思いますが、私は赤いセロリがこれだけで何をするべきかに応じて、この解決策はとても清潔ではないとも考えています。
ありがとうございました。
誰も答えなかったことを悲しんでいる。あなたはこれに対する解決策を見つけましたか? @onizukaek –