Flaskアプリケーションの前にPythonを使用しましたが、以前はCeleryを使用したことがありません。ドキュメントを読んですべてを設定した後(そして複数のワーカーでテストしたように動作します)、SQLクエリを実行しようとしています。クエリから返された各行に対して、セラーによって処理されるように送信しますワーカー。MySQLクエリから返された各行のCeleryタスクを実行しますか?
以下は、非常に基本的なコードのサンプルです。
from celery import Celery
import MySQLdb
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
db = MySQLdb.connect(host="localhost", user="DB_USER", passwd="DB_PASS", db="DB_NAME")
cur = db.cursor()
cur.execute("SELECT * FROM myTable")
for row in cur.fetchall():
print_query_result(row[0])
db.close()
def print_query_result(result):
print result
基本的に、 'myTable'テーブル内のすべてを選択し、返された各行に対してそれを印刷します。 Pythonだけを使ってコードを呼び出すと、うまく動作し、MySQLテーブルのすべてのデータが出力されます。 .delay()関数を使用してそれを呼び出すと、それをワーカーに送信して処理するだけで、ワーカーに送信され、データベースの一番上の行が出力されます。
私はサブタスクを読もうとしていましたが、私が正しい方向に向いているかどうかは分かりません。
要するに、私はこれが起きたいと思っていますが、私はそれをどこから始めるべきではありません。誰にもアイデアはありますか?
- テーブル
- 内のすべての行を選択するためのSQLクエリはキュー内の次の項目をピックアップバックデータベース
- にいくつかのコード
- リターンコードの結果を処理するために作業者にもたらす/各行を送る(IF任意)
ありがとうございます。
EDIT 1:
私の代わりにSQLAlchemyのを使用するために自分のコードを更新しましたが、結果はまだ大丈夫です私の古いクエリのように戻ってきています。
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
print i.domain
print_query_result.s()
@app.task
def print_query_result():
print "Received job"
print_domain.delay()
労働者の.pyファイルのリターンを実行している:
[2016-08-02 02:08:40,881: INFO/MainProcess] Received task: tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de]
[2016-08-02 02:08:41,036: WARNING/Worker-3] result1
[2016-08-02 02:08:41,037: WARNING/Worker-3] result2
[2016-08-02 02:08:41,039: INFO/MainProcess] Task tasks.print_domain[65d7667a-fc70-41f7-8caa-b991f360a9de] succeeded in 0.154022816569s: None
あなたが見ることができるように、作業員は、私が照会てるテーブルから「結果1」と「結果2」を取得しますが、その後、それはdoesnのサブタスクで "Job received"を印刷するだけのコマンドを実行しているようです。
更新:私のコードがこのように見えるようにサブタスクの最後に.delay()がなければならないように見えます。
from celery import Celery
from models import DBDomains
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def print_domain():
query = DBDomains.query.all()
for i in query:
subtask = print_query_result.s(i.domain)
subtask.delay()
@app.task
def print_query_result(domain):
print domain
print_domain.delay()
DBに対してクエリを実行するタスクが必要で、返された各行に対して別のタスクがキューに入れられますか?または、クエリを実行して新しいタスクを通常の関数にディスパッチする最上位レベルでは問題ありませんか? –
ええ、基本的には、クエリを作成するタスクになりますし、各結果に対して、ワーカーが処理する別のタスク/キュー項目を生成します。その理由は、30秒ごとにクエリされるデータの行数千(数万ではないにしても)を扱うため、より速く実行されている従業員が処理されるほど多くのデータが処理されるためです。また、より多くの労働者に対処しなけれ私もスレッドを見ていましたが、セロリは簡単に見えました。遠隔の作業員にも拡大することができます。 – mphowarth