私はLuigiを使用していくつかのタスクを実行していますが、出力を標準化されたファイルの場所に一括転送する必要があります。私はこれを行うには上書きcomplete()
方法でWrapperTaskを書いた:WrapperTaskの.complete()を再試行してください
from luigi.task import flatten
class TaskX(luigi.WrapperTask):
date = luigi.DateParameter()
client = luigi.s3.S3Client()
def requires(self):
yield TaskA(date=self.date)
yield TaskB(date=self.date)
def complete(self):
tasks_complete = all(r.complete() for r in flatten(self.requires()))
## at the end of everything, batch copy the files
if tasks_complete:
self.client.copy('current-old', 'current')
return True
else:
return False
if __name__ == "__main__":
luigi.run()
が、私は、トラブルのプロセスが実際に終了したときに呼び出されるcomplete()
の条件付きの一部を取得を抱えています。
asynchronous behaviorが他の人によって指摘されていると思われますが、修正方法がわかりません。
私は、これらのコマンドラインパラメータとルイージを実行しようとしました:
$ PYTHONPATH="" luigi --module x TaskX --worker-retry-external-task
しかし、それは正しく動作していないようです。このタイプのタスクを処理するための正しいアプローチですか?
また、私は興味があります - --worker-retry-external-task
コマンドの経験がありましたか?私はそれを理解するいくつかの問題を抱えています。 source codeで
、
def _is_external(task):
return task.run is None or task.run == NotImplemented
はLuigiTaskがWrapperTask
はないrun()
方法を有しているか否かを判断するために呼び出されます。したがって、私は--retry-external-task
フラグが完了するまでこのためにcomplete()
を再試行し、アクションを実行することを期待しています。しかし、通訳の周りを遊んでいるだけで、私はそれを信じるようになります。
>>> import luigi_newsletter_process
>>> task = luigi_newsletter_process.Newsletter()
>>> task.run
<bound method Newsletter.run of Newsletter(date=2016-06-22, use_s3=True)>
>>> task.run()
>>> task.run == None
False
>>> task.run() == None
True
このコードスニペットは、それが何であるかを考えていません。
ここでオフベースですか?