私は毎日実行するように設定したセロリのタスクで単体テストを実行しようとしています。
私は関数をインポートしようとしたが、私のテストでそれを呼び出すが、これは動作しません。セロリの定期タスクで単体テストを実行することはできますか?
タスクは次のとおりです。
@shared_task
def create_a_notification_if_a_product_is_in_or_out_of_season():
"""
Send a notification if a product is now in or out of season
"""
julian_date = date.today().timetuple().tm_yday + 1
active_products = Product.objects.filter(status='ACTIVE')
for products in active_products:
in_season_prd = ProductDescription.objects.filter(
product=products,
early_start_julian=julian_date
)
for prd in in_season_prd:
notification = Notification()
notification.type = notification_choices.PRODUCT_IN_SEASON
notification.description = str(prd.product.name) + " will be in season from tomorrow."
notification.save()
、ここでは、私のテストのうちの一つの例である:
def test_when_product_is_about_to_come_in_to_seasonality(self):
"""
Make a notification when a product is due to come in to seasonality tomorrow
"""
p = Product.objects.first()
p.status = "ACTIVE"
today = date.today().timetuple().tm_yday
p.early_start_julian = today + 1
create_a_notification_if_a_product_is_in_or_out_of_season()
updated_notifications = Notification.objects.all().count()
self.assertNotEqual(self.current_notifications, updated_notifications)
任意の助けをいただければ幸いです!
おかげ
でのみ、そのテストを飾ることができますし、私はタスクが、私は自己を比較したときにそのための通知を作成し、すべきで実行されていない同じ結果を持っています。 current_notifications with updated_notificationはまだ同じで、テストは失敗します –
これはまだ失敗した理由は、テストを実行する製品を作成した工場が原因であることに気付きました。インポートされたタスクの最後に.apply()を追加すると機能し、タスクが起動します。本当にありがとう! –
@BenCurrieを助けて嬉しいです:) –