2016-06-22 16 views
0

私はLuigiを使用していくつかのタスクを実行していますが、出力を標準化されたファイルの場所に一括転送する必要があります。私はこれを行うには上書きcomplete()方法でWrapperTaskを書いた:WrapperTaskの.complete()を再試行してください

from luigi.task import flatten 

class TaskX(luigi.WrapperTask): 
    date = luigi.DateParameter() 
    client = luigi.s3.S3Client() 

    def requires(self): 
     yield TaskA(date=self.date)  
     yield TaskB(date=self.date) 

    def complete(self): 
     tasks_complete = all(r.complete() for r in flatten(self.requires())) 

     ## at the end of everything, batch copy the files 
     if tasks_complete: 
      self.client.copy('current-old', 'current') 
      return True 
     else: 
      return False 


if __name__ == "__main__": 
    luigi.run() 

が、私は、トラブルのプロセスが実際に終了したときに呼び出されるcomplete()の条件付きの一部を取得を抱えています。

asynchronous behaviorが他の人によって指摘されていると思われますが、修正方法がわかりません。

私は、これらのコマンドラインパラメータとルイージを実行しようとしました:

$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task 

しかし、それは正しく動作していないようです。このタイプのタスクを処理するための正しいアプローチですか?

また、私は興味があります - --worker-retry-external-taskコマンドの経験がありましたか?私はそれを理解するいくつかの問題を抱えています。 source code

def _is_external(task): 
    return task.run is None or task.run == NotImplemented 

はLuigiTaskがWrapperTaskはないrun()方法を有しているか否かを判断するために呼び出されます。したがって、私は--retry-external-taskフラグが完了するまでこのためにcomplete()を再試行し、アクションを実行することを期待しています。しかし、通訳の周りを遊んでいるだけで、私はそれを信じるようになります。

>>> import luigi_newsletter_process 
>>> task = luigi_newsletter_process.Newsletter() 
>>> task.run 
    <bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)> 
>>> task.run() 
>>> task.run == None 
False 
>>> task.run() == None 
True 

このコードスニペットは、それが何であるかを考えていません。

ここでオフベースですか?

答えて

0

まだ理論的には.complete()をオーバーライドする必要があると思いますが、それはなぜか分かりませんが、プロセスを実行した後にファイルを一括転送する方法を探しているのであれば実行可能な解決法は、.run()メソッド内で転送を行うことだけです。

関連する問題