2017-12-29 39 views
1

実際のセロリグループではなく、タスクのグループのステータスで、クライアントからビューにメッセージを「正常に」送信しています。問題は:これは実際にすべてのタスクが実際に実行されているかどうかを無視します。私はコールバック(task.apply_async(link=))を追加しようとしましたが、それはどちらも役に立たなかった。自身が実際に多くの時間を取ることはありませんが、私は本当にタスクが実際に実行されたとき、カウンタをインクリメントできるようにしたいと思いdjango-channels/celery:タスクリストの進捗状況を追跡する方法は?

タスク:

if 'selected' in request.GET: 
     selected_as_list = request.GET.getlist('selected') 
     print(selected_as_list) 
     searches = list(set([s.strip() for s in selected_as_list if s.strip()])) 
     task_group = [refresh_func.s(str(user_profile.id), search, dont_auto_add=True) for search in searches] 

     for i,task in enumerate(task_group): 
      task.apply_async() 
      Group(str(request.user.id)).send({"text": json.dumps({"tasks_completed": i+1, 
                    "task_id": "fb_import", 
                    "completed": True if i == len(task_group) -1 else False, 
"total": len(task_group)})}) 

だから私は、コードを移動します実際に操作を実行するのと同じブロックに移動します。今は多くのパラメータを渡していたことを意味しましたが、これは最初の問題を解決しました。しかし、それは別のものを提示する:インデックスが "1"のタスクは、インデックス "3"のタスクの後に終了することができ、これは明らかにカウンタを誤って更新する。

これを解決するには何ができますか?

答えて

1

スポーンされたタスクのステータスを定期的にチェックするバックグラウンドスレッドを作成すると、タスクのIDを知っていればステータスを取得できますか?

あなたdjango-channelが有効になっているところそれはおそらくですので、このスレッドは(ないセロリタスクで)Djangoのサーバーで実行する必要があります:あなたは仕事でGroup(...).sendを呼び出した場合、おそらく、特別ので、(それにアクセスすることはできません通常セロリ労働者が別々のプロセス/マシンで実行)

.GETビューの実装でタスクを生成するとします。たぶん、タスクIDを収集して(そこではどこに生まれているのか)、スレッドのステータスを定期的にチェックすることができます(したがって、.GETの応答をブロックしないでください)。

のは、あなたのタスクを起動ビューは次のようになりましょう:

class Test(generic.TemplateView): 
    template_name = 'stack_092.html' 

    def get(self, request, *args, **kwargs): 
     logger.info("Yep") 
     task_group = [foo_task.s(i) for i in range(5)] 
     logger.info("Task signatures created: %s", task_group) 

     task_ids = [task.apply_async().task_id for task in task_group] 
     logger.info("Tasks launched") 
     th = threading.Thread(target=verify_task_ids, args=('request.user.id', task_ids)) 
     th.start() 
     logger.info("Thread started") 
     return super(Test, self).get(request, *args, **kwargs) 

そして、このようなものは、スレッドのためverify_task_idsターゲット機能のようになります。

def verify_task_ids(channel_group_id, task_ids): 
    previous_finished_task_ids = set() 
    finished_task_ids = set() 
    logger.info("Verifying %s task_ids", len(task_ids)) 
    while len(finished_task_ids) < len(task_ids): 
     finished_task_ids = set() 
     for task_id in task_ids: 
      if AsyncResult(task_id).ready(): 
       finished_task_ids.add(task_id) 
     if finished_task_ids != previous_finished_task_ids: 
      logger.info("%s new finished tasks", 
         len(finished_task_ids) - len(previous_finished_task_ids)) 
     previous_finished_task_ids = finished_task_ids 

例では、 channel_group_id引数は純粋にハードコードされた文字列"request.user.id"です。あなたのケースでは、サーバにログインしているユーザの実際のrequest.user.idで置き換える必要があります。それはあなたのグループIDですからです。

そして、あなたは新しいタスクが終了したとき、私は唯一のログメッセージを表示していることがわかります。ここでは

if finished_task_ids != previous_finished_task_ids: 
     logger.info("%s new finished tasks", 
        len(finished_task_ids) - len(previous_finished_task_ids)) 

ではなくlogger.info機能のあなたはおそらく

if finished_task_ids != previous_finished_task_ids: 
    Group(
     str(channel_group_id) 
    ).send(
     { 
      "text": json.dumps({ 
       "tasks_completed": len(finished_task_ids), 
       "task_id": "fb_import", 
       "completed": len(finished_task_ids) == len(task_ids), 
      }) 
     } 
    ) 

Iドンを呼び出す必要がどこです私はこのソリューションがうまくいくとは思えませんが、ちょっと試してみる価値がありますか?

+0

ありがとうございます。私はポーリングを避けようとしています。たぶん私はあなたの答えの一部を採用することができ、何かのために、このループの中で、どこかでタスクが実行されている間、ループをチェックします。 – zerohedge

+1

私は想像しています**:(**少なくとも、私にとっては、セロリの労働者はウェブサーバーとは異なるサーバーで動作する傾向があります。それはむしろ困難でした。しかし、あなたがそれをあなた自身で解決するならば、自分の質問に対する答えとしてあなたがしたことを加えることができますか?それは私にとっても興味深いものです。 – BorrajaX

+0

今日私は実際にそれをやっていました。現時点では非常に控えめで読みにくいですが、少しスパムのようですが、それは「非同期」です。私はそれがリファクタリングされたら答えを投稿しますが、ここでは一般的な考え方です:関数シグネチャを収集し、ヘルパー関数にリストとして送信し、forループで 'task.freeze()'を実行して、すべてのIDの状態が "SUCCESS"(これはチャネルを介してメッセージを送信する)を返すまで、このリストを反復する非同期タスクを開始します。それだけで、同様のforループでtask.apply_async()を実行します。 – zerohedge

関連する問題