2012-08-23 7 views
6

これはばかげた質問であるならば、私は謝罪し、恥で私の頭を隠して行くだろう、しかし:パイソン/ RQ - 監視員の状況

私はPythonでジョブをキューにRQを使用しています。私はこのように動作します:

  1. ジョブAが起動します。ジョブAは、Web APIを介してデータを取得し、それを保存します。
  2. ジョブAが実行されます。
  3. ジョブAが完了しました。
  4. Aが完了すると、ジョブBが開始します。ジョブBは、ジョブAによって保存された各レコードをチェックし、追加の応答データを追加します。
  5. ジョブBが完了すると、ユーザーはレポートの準備完了を知らせる幸せな電子メールを受け取ります。これまで

マイコード:

redis_conn = Redis() 
use_connection(redis_conn) 
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later 
w = Worker(q) 
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid) 
w.work() 

私は、とき仕事私の最善の解決策は、ジョブAを監視することができる2、労働者、ジョブA用とBの1ジョブBの労働者を持っていることだったと仮定し、 Aが完了したら、仕事Bで仕事を始めてください。

私の人生を救うことができないのは、私がどのようにして別の労働者のステータスを監視するかです。 job.idでジョブAからジョブIDを取得できます。私はw.nameでワーカー名をつかむことができます。しかし、私がどのようにその情報を他の労働者にどのように渡すかについて最も控えめではない。

または、これを行うためのもっと簡単な方法がありますか?完全に欠落していますか?

+1

ジョブAが完了するまでジョブBを実行できない場合(つまり、並列実行できないことを意味する)、なぜrqを使用するのですか?順番に処理する(アプリケーションをブロックしたくない場合は別のスレッドまたはプロセスで) –

+0

AとBのジョブはそれぞれ非常に時間がかかり、別々に発生する可能性があります多くの仕事をAのジョブとは独立したものにしてください。それが難しいとすれば、私は降伏するかもしれません。 – user1066609

+0

一緒に行くAとBのペアがありますか、あるいはBはどんなAにも依存できますか?後者の場合、同期化の問題が1つあります。 :-) –

答えて

0

あなたはおそらく、あなたのプロジェクトに移行するには余裕がありますが、そうでなければTwistedを見てください。 http://twistedmatrix.com/trac/私は現在、APIにヒットしたり、Webコンテンツをスクラップしたりするプロジェクトにこれを使用しています。並行して複数のジョブを実行し、特定のジョブを順番に整理するので、ジョブBはジョブAが完了するまで実行されません。

これは、試したい場合にTwistedを学習するためのベストチュートリアルです。 rqドキュメント上this pageからhttp://krondo.com/?page_id=1327

0

ジョブAとジョブBが1つの関数で行うことを結合し、次に例を使用します。 multiprocessing.Pool(それはmap_asyncメソッド)を使用して、異なるプロセスでそれを栽培します。

私はrqに慣れていませんが、multiprocessingは標準ライブラリの一部です。デフォルトでは、CPUのコア数と同じ数のプロセスが使用されますが、これは通常、マシンを飽和させるのに十分です。

2

jobオブジェクトは、あなたがチェックすることができjob.resultから呼び出すresult属性を有しているように、それが見えます。ジョブが終了していない場合はNoneになりますが、ジョブが何らかの値(たとえば"Done"さえ)を返すようにしておけば、他のワーカーに最初のジョブの結果を確認させてから、 job.resultには値があり、最初の作業者が完了したことを意味します。

6

更新januari 2015は、このプル要求は現在マージされ、パラメータは、depends_onに変更され、すなわち:

:旧バージョンと、そのようなを実行している人々のためにそのまま残さ

second_job = q.enqueue(email_customer, depends_on=first_job) 

オリジナルのポスト

私はRQでジョブの依存関係を処理するプルリクエスト(https://github.com/nvie/rq/pull/207)を提出しました。このプル要求がでマージされます場合は、あなたが行うことができます:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

first_job = q.enqueue(generate_report) 
second_job = q.enqueue(email_customer, after=first_job) 
# In the second enqueue call, job is created, 
# but only moved into queue after first_job finishes 

今のところ、私は順番にあなたのジョブを実行するラッパー関数を書くことをお勧めします。例:

関連する問題