2017-01-23 5 views
1

私はいくつかのluigiワークフローを書いていますが、私はタスクをデバッグしようとしています。そうするために、私は同じパラメータを何度も繰り返してこれらのタスクを再実行する必要があります。luigi:デバッグの目的でタスクを再実行しますか?

私はルイージタスクが冪等であり、それ以前と同じ入力が与えられたときにそのため、通常は再実行されず、これはワークフローをデバッグし後、生産が望まれるまさにであることを理解。しかし、開発中に、正確に同じ入力と出力でワークフローを再実行することは有用であり、私は必要と主張します。

開発中に各タスクでFalseを返すようにcomplete()メソッドをオーバーライドすることができます。ただし、これによりタスクは未完了の状態になります。

ワークフローを「開発」モードや「デバッグ」モードで実行するように設定する方法を探しています。そのため、実行してからもう一度実行することができます。ワークフローが欲しいものを正確に実行していると確信するまで、タスクは正しく実行されます。

luigiでこれを行う方法はありますか?

ありがとうございます。

================は================以下の私のコメントパー

が、それはと思わ後で追加します入力パラメータをタスクに変更しても、タスクは再実行されません。 output()メソッドが一意の値を返す場合に限り、そのタスクは再実行可能です。これは、入力パラメータを変更すると、入力パラメータが異なる別の呼び出しと同じ出力を返すかどうかにかかわらず、本当に冪等性のタスクを新しい一意のエンティティとして扱う必要があるため、「冪等」の定義に反しているようです。

次のコードは、この問題を示しています。 "x"パラメータはoutput()メソッドが返すファイル名を指定し、 "y"パラメータは出力ファイルの内容で使用されますが、出力ファイルの名前では使用されません。

"--x 10 --y 20"と " - x 10 - y 30"を指定してワークフローを呼び出すと、2回目の呼び出しでいずれのタスクも再実行されません。これは間違った動作だと私は信じています。しかし、ワークフローを "--x 10 - y 20"と " - x 11 --y 20"の順に呼び出すと、両方のタスクが実際に再実行されます。

#!/usr/bin/python3                            
# -*- python -*-                             

import luigi 

class Child(luigi.Task): 

    x = luigi.Parameter() 
    y = luigi.Parameter() 

    def requires(self): 
     return [] 

    def output(self): 
     return luigi.LocalTarget("child_{}.txt".format(self.x)) 

    def run(self): 
     with self.output().open('w') as f: 
      f.write('{} {}\n'.format(self.x, self.y)) 

class Parent(luigi.Task): 

    x = luigi.Parameter() 
    y = luigi.Parameter() 

    def requires(self): 
     return [ Child(self.x, self.y) ] 

    def output(self): 
     return luigi.LocalTarget("parent_{}.txt".format(self.x)) 

    def run(self): 
     with self.input()[0].open() as fin, self.output().open('w') as fout: 
      for line in fin: 
       fout.write("from command line: --x {} --y {}, from child: {}\n".format(self.x, self.y, line.strip())) 

if __name__ == '__main__': 
    luigi.run() 

答えて

0

あなたが言ったように、デバッグモードは素晴らしいでしょう。しかし、私はルイージはそういうものを持っていないと思う。

タスクがcomplete()メソッドを呼び出す前に、hereのようにターゲットを削除することができます。あなたのタスクは、このクラスのサブクラスでなければならないので、実行する前に--forceパラメータを使用してリセットすることができます。

タスクがローカルファイルを出力として持つ場合にのみ、この解決策が機能することに注意してください。 S3キー/バケット、データベースのテーブルや行などを削除するには、それをカスタマイズする必要があります。

+1

ありがとうございました。私のアプリケーションはS3バケットとRedshiftテーブルを作成しています。実際にタスクを再実行するためにこれらのアイテムを削除しなければなりませんでした。残念ながら、これは特定の場合には役立ちません。しかし、私は解決策を見つけた可能性があると思います。ワークフローにデバッグテストを実行し、このパラメータを各タスクの引数として渡すたびに一意のパラメータを生成すると、実際にすべてのタスクを再実行できます。ワークフローそれぞれのタスクを構造化してDictParameterを引数にとり、デバッグの実行中にこの一意のIDを渡します。それは働くことを望む。 – HippoMan

+0

明日の朝にこれをさらに詳しくテストし、私の所見をここに報告します。 – HippoMan

+0

さて、私は間違っていた。私は間違って、入力をタスクに変更することによって(つまり、一意の入力パラメータを追加することによって)、ルイジはいつもタスクを再実行すると考えました。しかし、それは明らかにそうではありません。そのタスクへの入力パラメータに関係なく、タスクを再実行する前に、出力を一意にする必要があります。これは「偶像崇拝者」の厳密な定義に反しているようですが、私はこれで生きなければならないと思います。しかたがない。 – HippoMan

関連する問題