2016-11-03 24 views
2

My TaskBにはTaskAが必要です。完了すると、TaskAはMySQLテーブルに書き込みを行い、TaskBはこの出力を入力としてテーブルに取り込みます。LuigiワークフローのMySQLターゲット

私はルイージでこれを行う方法を理解できないようです。誰かが私に例を示すか、ここで簡単な例を教えてもらえますか?

答えて

5

luigiの既存のMySqlTargetは、タスクが完了した時点を示すために別のマーカーテーブルを使用します。ここでは、私が取るだろうラフなアプローチです...しかし、あなたの質問は非常に抽象的なので、実際にはもっと複雑になる可能性があります。

import luigi 
from datetime import datetime 
from luigi.contrib.mysqldb import MySqlTarget 


class TaskA(luigi.Task): 
    rundate = luigi.DateParameter(default=datetime.now().date()) 
    target_table = "table_to_update" 
    host = "localhost:3306" 
    db = "db_to_use" 
    user = "user_to_use" 
    pw = "pw_to_use" 

    def get_target(self): 
     return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table, 
          update_id=str(self.rundate)) 

    def requires(self): 
     return [] 

    def output(self): 
     return self.get_target() 

    def run(self): 
     #update table 
     self.get_target().touch() 


class TaskB(luigi.Task): 
    def requires(self): 
     return [TaskA()] 

    def run(self): 
     # reading from target_table 
+0

ありがとうございます。これは本当に役立ちます。私は1つの質問があった?つまり、行の主IDであるupdate_idを使用して、どの行が更新されているかをMySqlTargetが追跡していることを意味します。そして、もし私のプライマリIDがオートインクリメントなら、私は何をしますか? –

+0

ああ、それは面倒です。私はあなたがupdate_idとしてautoincrement idを超えて別の一意の値を使用する必要があると思います。それは文字通り "INSERT INTO {marker_table} DUPLICATE KEY UPDATE ON(UPDATE_ID、TARGET_TABLE) VALUES(%sは、%sの) UPDATE_ID = VALUES(UPDATE_ID) """ .format(marker_table = self.marker_table "" '実行しています)、 (self.update_id、self.table) ' – MattMcKnight

+0

解決策のようですので、ワークフローの更新が記録される更新テーブルがありますか?しかし、私は各タスクのためのテーブルを維持したくありません(そして私は多くのタスクがあります。)何が起こっているのあなたのSQLの解釈から、私はこれを行う必要がありますように見える 'return MySqlTarget(host = self.host、database –

関連する問題