2017-07-28 12 views
0

タスクが最大再試行回数(on_failure)に達したときに何が起きるかをオーバーライドするために、カスタムCeleryタスククラスを作成しています。タスクが失敗した場合、ユーザーモデルのステータスを更新する必要があります。以下はCeleryタスクのon_failureメソッドにパラメータを渡す

は私のカスタムタスククラスです:

class ReadyTask(Task): 

    def run(self, user): 
     try: 
      user.get_results() 
     except Exception as exc: 
      raise self.retry(exc=exc, max_retries=3) 

    def on_failure(self, exc, task_id, *args, **kwargs): 
     user.status = Status.READY 
     user.save() 

はどうやってそのステータスを更新するために、on_failure()方法にUserオブジェクトを渡すことができますか?

答えて

0

argsまたはkwargsを、あなたがそれをパラメータとして呼び出してタスクに送信すると、ユーザーのIDを検査できると思います。 argwの位置をチェックする必要がないので、kwargsで簡単に作成すると簡単です。その後、IDからあなたのユーザーをつかんで変更を加えますか?

したがって、function.apply(kwargs)、またはfunction.apply_async(kwargs=kwargs)またはfunction.delay(kwargs)を経由して、run関数にではなく、あなたが呼び出しているタスクである関数の引数/キーワード引数としてそれを送信しません。

ので:

user_id = kwargs.get('user_id')

# then resolve to user object, then update object

0

また、カスタムタスククラスにオブジェクトをバインドすることができます。 bind=Trueを使用します。

class ReadyTask(Task): 

    def run(self, user): 

     self.user_object = user 

     try: 
      self.user_object.get_results() 
     except Exception as exc: 
      raise self.retry(exc=exc, max_retries=3) 

    def on_failure(self, exc, task_id, *args, **kwargs): 
     self.user_object.status = Status.READY 
     self.user_object.save() 

@app.task(base=ReadyTask, bind=True) 
def do_stuff(self, *args, **kwargs): 
    self.user_object.do_stuff() 
関連する問題