2
My TaskBにはTaskAが必要です。完了すると、TaskAはMySQLテーブルに書き込みを行い、TaskBはこの出力を入力としてテーブルに取り込みます。LuigiワークフローのMySQLターゲット
私はルイージでこれを行う方法を理解できないようです。誰かが私に例を示すか、ここで簡単な例を教えてもらえますか?
My TaskBにはTaskAが必要です。完了すると、TaskAはMySQLテーブルに書き込みを行い、TaskBはこの出力を入力としてテーブルに取り込みます。LuigiワークフローのMySQLターゲット
私はルイージでこれを行う方法を理解できないようです。誰かが私に例を示すか、ここで簡単な例を教えてもらえますか?
luigiの既存のMySqlTargetは、タスクが完了した時点を示すために別のマーカーテーブルを使用します。ここでは、私が取るだろうラフなアプローチです...しかし、あなたの質問は非常に抽象的なので、実際にはもっと複雑になる可能性があります。
import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget
class TaskA(luigi.Task):
rundate = luigi.DateParameter(default=datetime.now().date())
target_table = "table_to_update"
host = "localhost:3306"
db = "db_to_use"
user = "user_to_use"
pw = "pw_to_use"
def get_target(self):
return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
update_id=str(self.rundate))
def requires(self):
return []
def output(self):
return self.get_target()
def run(self):
#update table
self.get_target().touch()
class TaskB(luigi.Task):
def requires(self):
return [TaskA()]
def run(self):
# reading from target_table
ありがとうございます。これは本当に役立ちます。私は1つの質問があった?つまり、行の主IDであるupdate_idを使用して、どの行が更新されているかをMySqlTargetが追跡していることを意味します。そして、もし私のプライマリIDがオートインクリメントなら、私は何をしますか? –
ああ、それは面倒です。私はあなたがupdate_idとしてautoincrement idを超えて別の一意の値を使用する必要があると思います。それは文字通り "INSERT INTO {marker_table} DUPLICATE KEY UPDATE ON(UPDATE_ID、TARGET_TABLE) VALUES(%sは、%sの) UPDATE_ID = VALUES(UPDATE_ID) """ .format(marker_table = self.marker_table "" '実行しています)、 (self.update_id、self.table) ' – MattMcKnight
解決策のようですので、ワークフローの更新が記録される更新テーブルがありますか?しかし、私は各タスクのためのテーブルを維持したくありません(そして私は多くのタスクがあります。)何が起こっているのあなたのSQLの解釈から、私はこれを行う必要がありますように見える 'return MySqlTarget(host = self.host、database –