これは、Djangoテストケース内でCeleryワーカーを開始することで可能です。
背景
Djangoのインメモリデータベースはsqlite3です。 the description page for Sqlite in-memory databasesに記載されているように、「インメモリデータベースを共有する[A] 11個のデータベース接続は同じプロセス内にある必要があります。これは、Djangoがメモリ内のテストデータベースを使用し、セロリが別のプロセスで開始されている限り、CeleryとDjangoにテストデータベースを共有させることは基本的に不可能であることを意味します。
ただし、celery.contrib.testing.worker.start_worker
を使用すると、同じプロセス内の別のスレッドでCeleryワーカーを開始することができます。この作業者は、メモリ内のデータベースにアクセスできます。
これは、セロリが既にDjangoプロジェクトでthe usual wayに設定されていることを前提としています。
ソリューション
ジャンゴ - セロリは、いくつかのクロススレッド間通信を必要とするので、孤立したトランザクションで実行されていないだけでテストケースが動作します。テストケースは、SimpleTestCase
またはその残りの同等物APISimpleTestCase
から直接継承し、クラス属性allow_database_queries
をTrue
に設定する必要があります。
キーは、setUpClass
メソッドTestCase
でセロリの作業を開始し、tearDownClass
メソッドで閉じます。キーの機能は、おそらくmysite.celery.app
から得られた、それぞれsetUpClass
及びtearDownClass
に呼び出されなければならない__enter__
と__exit__
方法を有するパイソンContextManager
を返し、現在のセロリアプリケーションのインスタンスを必要とする、celery.contrib.testing.worker.start_worker(app)
あります。おそらく、手動で入力してデコレータなどでContextManager
を使用しないようにする方法がありますが、わかりませんでした。ここでは一例tests.py
ファイルです:
from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase
from mysite.celery import app
class BatchSimulationTestCase(SimpleTestCase):
allow_database_queries = True
@classmethod
def setUpClass(cls):
super().setUpClass()
# Start up celery worker
cls.celery_worker = start_worker(app)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# Close worker
cls.celery_worker.__exit__(None, None, None)
def test_my_function(self):
# my_task.delay() or something
が何らかの理由で、検査員は、労働者に障害が発生した場合に、より良いエラーメッセージを提供するために、おそらく、'celery.ping'
と呼ばれるタスクを使用しようとします。さらに、キーワード引数として〜False
を設定しても、その存在の有無はまだ確認されています(start_worker
)。探しているタスクはcelery.contrib.testing.tasks.ping
です。ただし、このタスクはデフォルトではインストールされません。このタスクをcelery.contrib.testing
にINSTALLED_APPS
に追加してsettings.py
に追加することで可能です。しかし、これは作業者に見えるようにするだけです。作業者を生成するコードではありません。ワーカーを生成するコードはassert 'celery.ping' in app.tasks
で失敗します。これをコメントアウトするとすべてが機能しますが、インストールされたライブラリを変更するのは良い解決策ではありません。私はおそらく何か間違ったことをやっているが、どこかでそのようなcelery.py
として、app.autodiscover_tasks()
によってピックアップすることができ、私は上の和解問題を回避するには、単純な関数をコピーしている:
今
@app.task(name='celery.ping')
def ping():
# type:() -> str
"""Simple task that just returns 'pong'."""
return 'pong'
、テストが実行されたときに、ノーあり別のセロリプロセスを開始する必要があります。 CeleryワーカーはDjangoテストプロセスで別のスレッドとして開始されます。この作業者は、デフォルトのメモリ内テストデータベースを含む、メモリ内のデータベースをすべて見ることができます。労働者の数を制御するには、start_worker
で利用可能なオプションがありますが、デフォルトは1人の労働者です。
こんにちは、このアプローチは間違いなく面白そうです - しかし、私はこのメソッドを使用すると、セロリの労働者と通信することができないようです。私は余裕を持っていればそれを突きつけます。 – Marviel