2016-08-01 8 views
1

Flaskアプリケーションの前にPythonを使用しましたが、以前はCeleryを使用したことがありません。ドキュメントを読んですべてを設定した後(そして複数のワーカーでテストしたように動作します)、SQLクエリを実行しようとしています。クエリから返された各行に対して、セラーによって処理されるように送信しますワーカー。MySQLクエリから返された各行のCeleryタスクを実行しますか?

以下は、非常に基本的なコードのサンプルです。

from celery import Celery 
import MySQLdb 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME") 
    cur = db.cursor() 
    cur.execute("SELECT * FROM myTable") 

    for row in cur.fetchall(): 
     print_query_result(row[0]) 

    db.close() 

def print_query_result(result): 
    print result 

基本的に、 'myTable'テーブル内のすべてを選択し、返された各行に対してそれを印刷します。 Pythonだけを使ってコードを呼び出すと、うまく動作し、MySQLテーブルのすべてのデータが出力されます。 .delay()関数を使用してそれを呼び出すと、それをワーカーに送信して処理するだけで、ワーカーに送信され、データベースの一番上の行が出力されます。

私はサブタスクを読もうとしていましたが、私が正しい方向に向いているかどうかは分かりません。

要するに、私はこれが起きたいと思っていますが、私はそれをどこから始めるべきではありません。誰にもアイデアはありますか?

  • テーブル
  • 内のすべての行を選択するためのSQLクエリはキュー内の次の項目をピックアップバックデータベース
  • にいくつかのコード
  • リターンコードの結果を処理するために作業者にもたらす/各行を送る(IF任意)

ありがとうございます。

EDIT 1:

私の代わりにSQLAlchemyのを使用するために自分のコードを更新しましたが、結果はまだ大丈夫です私の古いクエリのように戻ってきています。

from celery import Celery 
from models import DBDomains 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    query = DBDomains.query.all() 
    for i in query: 
     print i.domain 
     print_query_result.s() 

@app.task 
def print_query_result(): 
    print "Received job" 

print_domain.delay() 

労働者の.pyファイルのリターンを実行している:

[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] 
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1 
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2 
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None 

あなたが見ることができるように、作業員は、私が照会てるテーブルから「結果1」と「結果2」を取得しますが、その後、それはdoesnのサブタスクで "Job received"を印刷するだけのコマンドを実行しているようです。

更新:私のコードがこのように見えるようにサブタスクの最後に.delay()がなければならないように見えます。

from celery import Celery 
from models import DBDomains 

app = Celery('tasks', broker='redis://localhost:6379/0') 
@app.task 
def print_domain(): 
    query = DBDomains.query.all() 
    for i in query: 
     subtask = print_query_result.s(i.domain) 
     subtask.delay() 


@app.task 
def print_query_result(domain): 
    print domain 

print_domain.delay() 
+0

DBに対してクエリを実行するタスクが必要で、返された各行に対して別のタスクがキューに入れられますか?または、クエリを実行して新しいタスクを通常の関数にディスパッチする最上位レベルでは問題ありませんか? –

+0

ええ、基本的には、クエリを作成するタスクになりますし、各結果に対して、ワーカーが処理する別のタスク/キュー項目を生成します。その理由は、30秒ごとにクエリされるデータの行数千(数万ではないにしても)を扱うため、より速く実行されている従業員が処理されるほど多くのデータが処理されるためです。また、より多くの労働者に対処しなけれ私もスレッドを見ていましたが、セロリは簡単に見えました。遠隔の作業員にも拡大することができます。 – mphowarth

答えて

1

タスク内からタスクを呼び出すときはいつでも、subtasksを使用する必要があります。幸いにも構文は簡単です。

from celery import Celery 

app = Celery('tasks', broker='redis://127.0.0.1:6379/0') 


@app.task 
def print_domain(): 
    for x in range(20): 
     print_query_result.s(x) 


@app.task 
def print_query_result(result): 
    print(result) 

(クエリの結果と範囲のxの代替(20)。)そして、あなたはセロリの出力を監視していた場合、あなたはタスクが労働者全体に作成し、配布表示されます。

+0

は、残念ながら動作するようには思えない、私は私が今持っているコードで、私のOPを更新しました。それも、クエリからの結果は間違いなく戻ってきているにもかかわらず、正しく呼び出されているサブタスクのようには見えません。これまでの支援に感謝します。私は問題を発見したように見えるよう – mphowarth

+0

私は再び私のOPを更新しました。あなたの助けに感謝し、私を正しい方向に向ける! – mphowarth

+0

奇数は、バージョンの違いかもしれません。投稿したサンプルが正しく機能しました。 –

関連する問題