タスクを管理するためにセロリを使用しようとしています。 私は多くのマイナーな仕事(電子メール、クロスサーバ投稿など)を持っています。 ファイルアップロードのような時間のかかるタスク。セロリー、1つのキューと複数のキューが並行して並んでいます
アップロードが常に1つずつ行われることを指定する方法はありますか。時間内に実行されるタスクは1つだけですが、他の作業者は他のキューで作業します。
タスクを管理するためにセロリを使用しようとしています。 私は多くのマイナーな仕事(電子メール、クロスサーバ投稿など)を持っています。 ファイルアップロードのような時間のかかるタスク。セロリー、1つのキューと複数のキューが並行して並んでいます
アップロードが常に1つずつ行われることを指定する方法はありますか。時間内に実行されるタスクは1つだけですが、他の作業者は他のキューで作業します。
タスクの実行をシリアル化する有効な方法は、相互排除(Mutual Exclusion)を使用することです。
Pythonのthreading
モジュールはused to this effectすることができa Lock
objectがあります
# ...
module_lock = threading.Lock() # or make this an attribute in an object with sufficiently-large scope
# ...
def do_interesting_task():
with module_lock.acquire():
interesting_task()
"すべての希望、ここに入力して、あなたがたを放棄します。"
ミューテックスとセマフォは強力なツールですが、意図しない使用でデッドロックが発生し、時々あなたのランチを食べることがあります。
私はそのようなソリューションを実装していますが、かなりうまくいきます。 しかし、私はかなり確信していません、それはmax_retries = Noneはリトライ回数が無制限であると述べています。 このソリューションはredisで動作しますが、アトミックな増分操作をサポートする他のエンジンでも動作します。
@task(max_retries=None,default_retry_delay=3)
def sleepTask():
if r.incr('sleep_working')>1:
r.incr('sleep_working',-1)
sleepTask.retry()
else:
try:
r.expire('sleep_working',3600)
sleep(30)
finally:
r.incr('sleep_working',-1)
return True
ここで重要な点は、incrはアトミックであるため、2つのクライアントがcounter == 1を受け取ることはありません。
も期限切れには何でも起こることができ、私たちは永遠に> 1私たちのカウンタを取得しますので、期限切れには、特定の時間後にカウンタが削除されます、どんなことを、確認します、非常に重要です。この値は必要に応じて調整できます。私の大きなファイルはアップロードされているので、3600はOKです。
私はこれが良いスタートポイントであると考えています。これは、自動的にredis_keyとexpire_timeの値を受け取ることでカスタムタスクオブジェクトを作成します。このような仕事をするなら、私はこの記事を更新します。ボーナスとして
、この解決策はまた、容易
anynumber>に> 1を変更することにより、私は、この意志は労働者を横切って動作することと思ういけない、/ 2/3と平行限界の任意の他の数を調整することができるされています。各作業者はまったく新しいmodule_lockを持つか、間違っていますか?とにかく、この問題を解決する私の方法を投稿してください。それについてのコメントを聞いてうれしいです。多分、それはいくつかの深刻な不足があります。私は最初のアプローチでは見ていません。 – Tigra
'module_lock'はプロセスごとになります。 –