生データを小さなファイルに分割する前処理タスクluigi
があります。これらのファイルは、実際のパイプラインで処理されます。実行時にluigiの依存関係が変更される
パラメータに関しては、パラメータとして1つの前処理されたファイルIDを持つ各パイプラインが必要です。ただし、このファイルIDは前処理ステップでのみ生成されるため、実行時にのみ認識されます。 、Experiment
は
最初は、何らかの形で第二の文書
への生データの分割を要求すべきである
import luigi import subprocess import random class GenPipelineFiles(luigi.Task): input_file = luigi.Parameter() def requires(self): pass def output(self): for i in range(random.randint(0,10)): yield luigi.LocalTarget("output/{}_{}.txt".format(self.input_file, i)) def run(self): for iout in self.output: command = "touch {}".format(iout.fname) subprocess.call(command, shell=True) class RunPipelineOnSmallChunk(luigi.Task): pass class Experiment(luigi.WrapperTask): input_file = luigi.Parameter(default="ex") def requires(self): file_ids = GenPipelineFiles(input_file=self.input_file) for file_id in file_ids: yield RunPipelineOnSmallChunk(directory=self.input_file, file_id=file_id) luigi.run()
wrapperタスク:私の考えを説明するために、私はこの-動作しないコードを提供します取得された前処理のファイルIDを持つ実際のパイプラインが必要です。
GenPipelineFiles
で出力ファイルの乱数が、これはExperiment
のrequires
にハードコーディングすることができないことを示しています。
おそらく、これに関連する質問は、luigi
タスクには、入力ターゲットと出力ターゲットがそれぞれ1つしかないということです。おそらくGenPipelineFiles
で複数の出力をモデル化する方法に関する注釈も問題を解決する可能性があります。
あなたは、この時点で取得しているエラーを説明できますか? –
luigi依存グラフは、 'requires'関数の戻り値に基づいて作成されます。ここでは、GePipelineFilesは決して返されないため、スケジュールされません。このコードは私の実際のコードではなく、決してエラーなしで実行されることはありません。それは私が直面している依存関係の問題を説明するためだけのものです –