2016-11-13 9 views
3

私はちょうどセロリーに精通し、質問があります。 、これは完璧に動作セロリグループ1つのデザインで複数のタスク

TASKS

@task 
def send_email(message): 
    mailserver.sendOneMessage(message) 

VIEWS

class newaccount(APIView): 
    def post(self, request, format=None): 
    send_email.delay(request.data.email) 

:私のセットアップは

は、電子メールを送信するタスクの例を取ることができますジャンゴ-のRedis、セロリですDjangoはRedisにメッセージを送信し、セリでそれらを取り出し、タスクを実行します。しかし、CeleryがRedisからのすべてのメッセージを特定の間隔で取得し、複数のメッセージで1つのタスクを実行するようにシステムを改善したいと考えています。これは、電子メールサーバーへの接続が遅いため、単一の要求として複数のメッセージを送信すると処理が高速になるためです。

私はこのような何かが仕事をしたい:

@task 
def send_emails(messages): 
    mailserver.sendMultipleMessages(messages) 

思考を

TASKS?新しい電子メールが

@shared_task() 
def add_email(user_id): 
    cache.set("email#{}".format(user_id), None, timeout=None) 

ステップ2をキャッシュするために追加のタスクを作成します

ステップ1:

+0

タスクに電子メールの配列を渡してから、その電子メールを送信するタスクをループしましたか? – Niloct

+0

@Niloctこれはできません。電子メールはユーザ認証ステップで追加されます。私は後で手動でレディスと定期的なセロリのタスクを使ってそれらをグループ化することができますが、私はそれを正しく設計する方法がわかりません。 –

答えて

2

私はキャッシュ(ジャンゴ-Redisの)としてのRedisを使用しておりますので、すでに私は、次のワークフローを実装しました。毎秒実行され、キャッシュに新しい電子メール

class ProcessEmailsTask(PeriodicTask): 
    run_every = timedelta(seconds=1) 

    def run(self, **kwargs): 
     call_email() 

def call_email(): 
    item_exists = True 
    ids = [] 
    while item_exists: 
    try: 
     key = next(cache.iter_keys("email#*")) 
     ids.append(key.split("email#")[1]) 
     cache.delete_pattern(key) 
    except: 
     item_exists = False 
    if len(ids) > 0: 
     send_emails_to(ids) 

ステップ3を実行し、両方のセロリの労働者とセロリビートと利益のためにルックアップする定期的なタスクを作成します!

関連する問題