1
セロリタスクをクラスとして作成しようとしていますが、難しかったです。クラスは次のとおりです。セロリ:カスタムベースクラス/子クラスベースのタスクがapp.tasksの下に表示されない
class BaseCeleryTask(app.Task):
def is_complete(self):
""" default method for checking if celery task has completed. """
# simply return result (since by default tasks return boolean indicating completion)
try:
return self.result
except AttributeError:
logger.error('Result not defined. Make sure task has run!')
return False
class MacroReportTask(BaseCeleryTask):
def run(self, params):
""" Override the default run method with signal factory run"""
# hold on to the factory
process = MacroCountryReport(params)
self.result = process.run()
return self.result
が、私はアプリを初期化し、app.tasks(または実行ワーカー)を確認する場合、アプリはそのレジストリにこれらの上記のタスクを持っていないようです。他の機能ベースのタスク(using app.task() decorator
)はうまく登録されているようです。次のメッセージと
process = SignalFactoryTask()
process.delay(params)
セロリワーカーエラー: Received unregistered task of type None
私は問題があると思います。通常の関数ベースのタスクと同様に、カスタムレジストリにカスタムクラスを追加するにはどうすればいいですか?
助けてくれてありがとう!私はテストする必要がありますが、それが動作するかどうかをお知らせします! – kri