2013-06-17 8 views
10

Djangoのユニットテストフレームワークでいくつかのセロリ機能をテストしようとしていますが、AsyncResultをチェックしようとすると、ユニットのセロリーでのAsyncResultのテスト

私はこのコードがRabbitMQの実際の環境で動作することを知っていますので、なぜテストフレームワークを使用してもうまくいかないのだろうかと思っていました。ここで

は一例です:

@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS = True, 
        CELERY_ALWAYS_EAGER = True, 
        BROKER_BACKEND = 'memory',) 
def test_celery_do_work(self): 
    result = myapp.tasks.celery_do_work.AsyncResult('blat') 
    applied_task = myapp.tasks.celery_do_work.apply_async((), task_id='blat') 
    applied_task.wait() 
    # THIS SUCCEEDS 
    self.assertTrue(applied_task.successful()) 
    # THIS FAILS 
    self.assertTrue(result.successful()) 

は、それがすぐに実行されますので、ALWAYS_EAGERオプションがAsyncResult機能を無効に使用していますか?もしそうなら、AsyncResultのステータスチェックをユニット化できる方法はありますか?私がALWAYS_EAGERオプションを取り出そうとすると、テストは決して実行されないので、私は紛失してしまいます。

ありがとうございます!

答えて

12

CELERY_ALWAYS_EAGERTrueの場合、apply_async()の呼び出しは実際にはapply()に置き換えられます。返される結果はEagerResultで、すでにタスクの結果が含まれています。

はい、設定すると、ALWAYS_EAGER = Trueは、AsyncResultの全機能を無効にします。非同期プロセス全体がバイパスされ、実際にブローカにタスクが送信されないため、結果はAsyncResultで取得できません。

CELERY_ALWAYS_EAGER = Trueを使用すると、セロリ結果だけが必要なコードパスをテストしているときに、EagerResultまたはAsyncResultと同じ方法で動作します。

AsyncResultでテストを実行する方法がありますが、これはCELERY_ALWAYS_EAGER = Falseですが、テストケースでタスクを呼び出す前にワーカーを開始する必要があります。作業者はあなたの仕事を実行することができ、AsyncResultはうまく動作します。 django-celery-testworkerを見ることができますが、私はそれをテストしていませんが、ちょうどそれを行うようです。

+0

私はそれがおそらくそのようなものだと考えましたが、EagerResultオブジェクトについてはわかりませんでした。彼らはいつもAsyncResultsになると思っていました。情報をありがとう! – oiez

+0

単体テストそのものから作業を開始するには[this](https://stackoverflow.com/a/43648921/1617295)を読むことをお勧めします。 – Raffi